You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2017/11/23 15:47:30 UTC
[5/9] incubator-edgent git commit: remove samples (now in separate
repo)
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java
deleted file mode 100644
index 3bd2414..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.iotp;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.edgent.connectors.iot.IotDevice;
-import org.apache.edgent.connectors.iot.QoS;
-import org.apache.edgent.connectors.iotp.IotpDevice;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-import com.google.gson.JsonObject;
-
-/**
- * IBM Watson IoT Platform Quickstart sample.
- * Submits a JSON device event every second using the
- * same format as the Quickstart device simulator,
- * with keys {@code temp}, {@code humidity} and {@code objectTemp}
- * and random values.
- * <P>
- * The device type is {@code iotsamples-edgent} and a random
- * device identifier is generated. Both are printed out when
- * the application starts.
- * </P>
- * A URL is also printed that allows viewing of the data
- * as it received by the Quickstart service.
- *
- * <p>See {@code scripts/connectors/iotp/README} for information about running the sample.
- */
-public class IotpQuickstart {
-
- public static void main(String[] args) {
-
- DirectProvider tp = new DirectProvider();
- Topology topology = tp.newTopology("IotpQuickstart");
-
- // Declare a connection to IoTF Quickstart service
- String deviceId = "qs" + Long.toHexString(new Random().nextLong());
- IotDevice device = IotpDevice.quickstart(topology, deviceId);
-
- System.out.println("Quickstart device type:" + IotpDevice.QUICKSTART_DEVICE_TYPE);
- System.out.println("Quickstart device id :" + deviceId);
- System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/"
- + deviceId);
-
- Random r = new Random();
- TStream<double[]> raw = topology.poll(() -> {
- double[] v = new double[3];
-
- v[0] = r.nextGaussian() * 10.0 + 40.0;
- v[1] = r.nextGaussian() * 10.0 + 50.0;
- v[2] = r.nextGaussian() * 10.0 + 60.0;
-
- return v;
- }, 1, TimeUnit.SECONDS);
-
- TStream<JsonObject> json = raw.map(v -> {
- JsonObject j = new JsonObject();
- j.addProperty("temp", v[0]);
- j.addProperty("humidity", v[1]);
- j.addProperty("objectTemp", v[2]);
- return j;
- });
-
- device.events(json, "sensors", QoS.FIRE_AND_FORGET);
-
- tp.submit(topology);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
deleted file mode 100644
index ac4ee92..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.iotp;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.edgent.connectors.iot.QoS;
-import org.apache.edgent.connectors.iotp.IotpDevice;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-import com.google.gson.JsonObject;
-import com.ibm.iotf.client.device.DeviceClient;
-import com.ibm.iotf.devicemgmt.DeviceData;
-import com.ibm.iotf.devicemgmt.device.ManagedDevice;
-
-/**
- * IBM Watson IoT Platform Quickstart sample.
- * Submits a JSON device event every second using the
- * same format as the Quickstart device simulator,
- * with keys {@code temp}, {@code humidity} and {@code objectTemp}
- * and random values.
- * <P>
- * The device type is {@code iotsamples-edgent} and a random
- * device identifier is generated. Both are printed out when
- * the application starts.
- * <P>
- * A URL is also printed that allows viewing of the data
- * as it received by the Quickstart service.
- * <P>
- * This sample demonstrates using the WIoTP API to initialize the IotpDevice
- * connector as well as the ability to publish events using the WIoTP HTTP protocol.
- *
- * <p>See {@code scripts/connectors/iotp/README} for information about running the sample.
- */
-public class IotpQuickstart2 {
-
- public static void main(String[] args) throws Exception {
- List<String> argList = Arrays.asList(args);
- boolean useDeviceClient = argList.contains("useDeviceClient");
- boolean useHttp = argList.contains("useHttp");
-
- DirectProvider tp = new DirectProvider();
- Topology topology = tp.newTopology("IotpQuickstart");
-
- // Declare a connector to IoTP Quickstart service, initializing with WIoTP API
- String deviceId = "qs" + Long.toHexString(new Random().nextLong());
- Properties options = new Properties();
- options.setProperty("org", "quickstart");
- options.setProperty("type", IotpDevice.QUICKSTART_DEVICE_TYPE);
- options.setProperty("id", deviceId);
- IotpDevice device;
- if (useDeviceClient) {
- System.out.println("Using WIoTP DeviceClient");
- device = new IotpDevice(topology, new DeviceClient(options));
- }
- else {
- System.out.println("Using WIoTP ManagedDevice");
- DeviceData deviceData = new DeviceData.Builder().build();
- device = new IotpDevice(topology, new ManagedDevice(options, deviceData));
- }
-
- System.out.println("Quickstart device type:" + IotpDevice.QUICKSTART_DEVICE_TYPE);
- System.out.println("Quickstart device id :" + deviceId);
- System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/"
- + deviceId);
-
- Random r = new Random();
- TStream<double[]> raw = topology.poll(() -> {
- double[] v = new double[3];
-
- v[0] = r.nextGaussian() * 10.0 + 40.0;
- v[1] = r.nextGaussian() * 10.0 + 50.0;
- v[2] = r.nextGaussian() * 10.0 + 60.0;
-
- return v;
- }, 1, TimeUnit.SECONDS);
-
- TStream<JsonObject> json = raw.map(v -> {
- JsonObject j = new JsonObject();
- j.addProperty("temp", v[0]);
- j.addProperty("humidity", v[1]);
- j.addProperty("objectTemp", v[2]);
- return j;
- });
-
- if (!useHttp) {
- device.events(json, "sensors", QoS.FIRE_AND_FORGET);
- }
- else {
- System.out.println("Publishing events using HTTP");
- device.httpEvents(json, "sensors");
- }
-
- tp.submit(topology);
- }
- }
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java
deleted file mode 100644
index e2f4b12..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.edgent.samples.connectors.iotp;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.edgent.connectors.iot.HeartBeat;
-import org.apache.edgent.connectors.iot.IotDevice;
-import org.apache.edgent.connectors.iot.QoS;
-import org.apache.edgent.connectors.iotp.IotpDevice;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.providers.direct.DirectTopology;
-import org.apache.edgent.samples.topology.SensorsAggregates;
-import org.apache.edgent.topology.TStream;
-
-import com.google.gson.JsonObject;
-
-/**
- * Sample sending sensor device events to IBM Watson IoT Platform. <BR>
- * Simulates a couple of bursty sensors and sends the readings from the sensors
- * to IBM Watson IoT Platform as device events with id {@code sensors}. <BR>
- * Subscribes to device commands with identifier {@code display}.
- * <P>
- * In addition a device event with id {@code hearbeat} is sent
- * every minute. This ensure a connection attempt to IBM Watson IoT Platform
- * is made immediately rather than waiting for a bursty sensor to become
- * active.
- * <P>
- * This sample requires an IBM Watson IoT Platform service and a device configuration.<BR>
- * In order to see commands send from IBM Watson IoT Platform
- * there must be an analytic application
- * that sends commands with the identifier {@code display}.
- * </P>
- *
- * <p>See {@code scripts/connectors/iotp/README} for information about a
- * prototype device configuration file and running the sample.
- */
-public class IotpSensors {
-
- /**
- * Run the IotpSensors application.
- *
- * Takes a single argument that is the path to the
- * device configuration file containing the connection
- * authentication information.
- *
- * @param args Must contain the path to the device configuration file.
- *
- * @see IotpDevice#IotpDevice(org.apache.edgent.topology.Topology, File)
- */
- public static void main(String[] args) {
-
- String deviceCfg = args[0];
-
- DirectProvider tp = new DirectProvider();
- DirectTopology topology = tp.newTopology("IotpSensors");
-
- // Declare a connection to IoTF
- IotDevice device = new IotpDevice(topology, new File(deviceCfg));
-
- // Simulated sensors for this device.
- simulatedSensors(device, true);
-
- // Heartbeat
- heartBeat(device, true);
-
- // Subscribe to commands of id "display" for this
- // device and print them to standard out
- displayMessages(device, true);
-
- tp.submit(topology);
- }
-
-
- /**
- * Simulate two bursty sensors and send the readings as IoTF device events
- * with an identifier of {@code sensors}.
- *
- * @param device
- * IoT device
- * @param print
- * True if the data submitted as events should also be printed to
- * standard out.
- */
- public static void simulatedSensors(IotDevice device, boolean print) {
-
- TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology());
- if (print)
- sensors.print();
-
- // Send the device streams as IoTF device events
- // with event identifier "sensors".
- device.events(sensors, "sensors", QoS.FIRE_AND_FORGET);
- }
-
- /**
- * Create a heart beat device event with
- * identifier {@code heartbeat} to
- * ensure there is some immediate output and
- * the connection to IoTF happens as soon as possible.
- * @param device IoT device
- * @param print true to print generated heartbeat tuples to System.out.
- */
- public static void heartBeat(IotDevice device, boolean print) {
- TStream<JsonObject> hbs =
- HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
- if (print)
- hbs.print();
- }
-
-
- /**
- * Subscribe to IoTP device commands with identifier {@code display}.
- * Subscribing to device commands returns a stream of JSON objects that
- * include a timestamp ({@code tsms}), command identifier ({@code command})
- * and payload ({@code payload}). Payload is the application specific
- * portion of the command. <BR>
- * In this case the payload is expected to be a JSON object containing a
- * {@code msg} key with a string display message. <BR>
- * The returned stream consists of the display message string extracted from
- * the JSON payload.
- * <P>
- * Note to receive commands a analytic application must exist that generates
- * them through IBM Watson IoT Platform.
- * </P>
- *
- * @param device the device
- * @param print true to print the received command's payload to System.out.
- * @return the stream
- * @see IotDevice#commands(String...)
- */
- public static TStream<String> displayMessages(IotDevice device, boolean print) {
- // Subscribe to commands of id "display" for this device
- TStream<JsonObject> statusMsgs = device.commands("display");
-
- // The returned JSON object includes several fields
- // tsms - Timestamp in milliseconds (this is generic to a command)
- // payload.msg - Status message (this is specific to this application)
-
- // Map to a String object containing the message
- TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
- if (print)
- messages.print();
- return messages;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java
deleted file mode 100644
index 43dcfe9..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Samples showing use of the IBM Watson IoT Platform connector
- * to publish device events and subscribe to device
- * commands.
- *
- * <p>The "Quickstart" samples connect to the IBM Watson IoT Platform
- * using its Quickstart feature that does not require device registration.
- * When the samples are run they print out a URL which allows a browser
- * to see the data being sent from this sample.
- *
- * <p>The other samples connect to your IBM Watson IoT Platform service instance
- * using device and application registrations that you have created with your
- * service instance.
- *
- * <p>See each sample's Javadoc for more information.
- *
- * <p>See {@code scripts/connectors/iotp/README} for information about running the samples.
- */
-package org.apache.edgent.samples.connectors.iotp;
-
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
deleted file mode 100644
index a0264f1..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Properties;
-
-import javax.sql.DataSource;
-
-/**
- * Utilities for the sample's non-streaming JDBC database related actions.
- */
-public class DbUtils {
-
- /**
- * Get the JDBC {@link DataSource} for the database.
- * <p>
- * The "db.name" property specifies the name of the database.
- * Defaults to "JdbcConnectorSampleDb".
- *
- * @param props configuration properties
- * @return the DataSource
- * @throws Exception on failure
- */
- public static DataSource getDataSource(Properties props) throws Exception {
- return createDerbyEmbeddedDataSource(props);
- }
-
- /**
- * Initialize the sample's database.
- * <p>
- * Tables are created as needed and purged.
- * @param ds the DataSource
- * @throws Exception on failure
- */
- public static void initDb(DataSource ds) throws Exception {
- createTables(ds);
- purgeTables(ds);
- }
-
- /**
- * Purge the sample's tables
- * @param ds the DataSource
- * @throws Exception on failure
- */
- public static void purgeTables(DataSource ds) throws Exception {
- try (Connection cn = ds.getConnection()) {
- Statement stmt = cn.createStatement();
- stmt.execute("DELETE FROM persons");
- }
- }
-
- private static void createTables(DataSource ds) throws Exception {
- try (Connection cn = ds.getConnection()) {
- Statement stmt = cn.createStatement();
- stmt.execute("CREATE TABLE persons "
- + "("
- + "id INTEGER NOT NULL,"
- + "firstname VARCHAR(40) NOT NULL,"
- + "lastname VARCHAR(40) NOT NULL,"
- + "PRIMARY KEY (id)"
- + ")"
- );
- }
- catch (SQLException e) {
- if (e.getLocalizedMessage().contains("already exists"))
- return;
- else
- throw e;
- }
- }
-
- private static DataSource createDerbyEmbeddedDataSource(Properties props) throws Exception
- {
- String dbName = props.getProperty("db.name", "JdbcConnectorSampleDb");
-
- // For our sample, avoid a compile-time dependency to the jdbc driver.
- // At runtime, require that the classpath can find it.
-
- String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
-
- Class<?> nsDataSource = null;
- try {
- nsDataSource = Class.forName(DERBY_DATA_SOURCE);
- }
- catch (ClassNotFoundException e) {
- String msg = "Fix the test classpath. ";
- if (System.getenv("DERBY_HOME") == null) {
- msg += "DERBY_HOME not set. ";
- }
- msg += "Class not found: "+e.getLocalizedMessage();
- System.err.println(msg);
- throw new IllegalStateException(msg);
- }
- DataSource ds = (DataSource) nsDataSource.newInstance();
-
- @SuppressWarnings("rawtypes")
- Class[] methodParams = new Class[] {String.class};
- Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
- Object[] args = new Object[] {dbName};
- dbname.invoke(ds, args);
-
- // create the db if necessary
- Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
- args = new Object[] {"create"};
- create.invoke(ds, args);
-
- // set the user
- Method setuser = nsDataSource.getMethod("setUser", methodParams);
- args = new Object[] { props.getProperty("db.user", System.getProperty("user.name")) };
- setuser.invoke(ds, args);
-
- // optionally set the pw
- Method setpw = nsDataSource.getMethod("setPassword", methodParams);
- args = new Object[] { props.getProperty("db.password") };
- if (args[0] != null)
- setpw.invoke(ds, args);
-
- return ds;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
deleted file mode 100644
index bb57629..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-/**
- * A Person object for the sample.
- */
-public class Person {
- int id;
- String firstName;
- String lastName;
- Person(int id, String first, String last) {
- this.id = id;
- this.firstName = first;
- this.lastName = last;
- }
- public String toString() {
- return String.format("id=%d first=%s last=%s",
- id, firstName, lastName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
deleted file mode 100644
index f7f1211..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-/**
- * Utilities for loading the sample's person data.
- */
-public class PersonData {
-
- /**
- * Load the person data from the path specified by the "persondata.path"
- * property.
- * @param props configuration properties
- * @return the loaded person data
- * @throws Exception on failure
- */
- public static List<Person> loadPersonData(Properties props) throws Exception {
- String pathname = props.getProperty("persondata.path");
- List<Person> persons = new ArrayList<>();
- Path path = new File(pathname).toPath();
- try (BufferedReader br = Files.newBufferedReader(path)) {
- int lineno = 0;
- String line;
- while ((line = br.readLine()) != null) {
- lineno++;
- Object[] fields = parseLine(line, lineno, pathname);
- if (fields == null)
- continue;
- persons.add(new Person((Integer)fields[0], (String)fields[1], (String)fields[2]));
- }
- }
- return persons;
- }
-
- private static Object[] parseLine(String line, int lineno, String pathname) {
- line = line.trim();
- if (line.startsWith("#"))
- return null;
-
- // id,firstName,lastName
- String[] items = line.split(",");
- if (items.length < 3)
- throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
- int id;
- try {
- id = new Integer(items[0]);
- if (id < 1)
- throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
- }
-
- Object[] fields = new Object[3];
- fields[0] = id;
- fields[1] = items[1].trim();
- fields[2] = items[2].trim();
- return fields;
- }
-
- /**
- * Convert a {@code List<Person>} to a {@code List<PersonId>}
- * @param persons the person list
- * @return the person id list
- */
- public static List<PersonId> toPersonIds(List<Person> persons) {
- return persons.stream()
- .map(person -> new PersonId(person.id))
- .collect(Collectors.toList());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
deleted file mode 100644
index 218a027..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-/**
- * Another class containing a person id for the sample.
- */
-public class PersonId {
- int id;
- PersonId(int id) {
- this.id = id;
- }
- public String toString() {
- return String.format("id=%d", id);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
deleted file mode 100644
index 006b459..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.edgent.connectors.jdbc.JdbcStreams;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-/**
- * A simple JDBC connector sample demonstrating streaming read access
- * of a dbms table and creating stream tuples from the results.
- */
-public class SimpleReaderApp {
- private final Properties props;
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1)
- throw new Exception("missing pathname to jdbc.properties file");
- SimpleReaderApp reader = new SimpleReaderApp(args[0]);
- reader.run();
- }
-
- /**
- * @param jdbcPropsPath pathname to properties file
- */
- SimpleReaderApp(String jdbcPropsPath) throws Exception {
- props = new Properties();
- props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
- }
-
- /**
- * Create a topology for the writer application and run it.
- */
- private void run() throws Exception {
- DirectProvider tp = new DirectProvider();
-
- // build the application/topology
-
- Topology t = tp.newTopology("jdbcSampleWriter");
-
- // Create the JDBC connector
- JdbcStreams myDb = new JdbcStreams(t,
- () -> DbUtils.getDataSource(props),
- dataSource -> dataSource.getConnection());
-
- // Create a sample stream of tuples containing a person id
- List<PersonId> personIdList = PersonData.toPersonIds(PersonData.loadPersonData(props));
- personIdList.add(new PersonId(99999));
- TStream<PersonId> personIds = t.collection(personIdList);
-
- // For each tuple on the stream, read info from the db table
- // using the "id", and create a Person tuple on the result stream.
- TStream<Person> persons = myDb.executeStatement(personIds,
- () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?",
- (personId,stmt) -> stmt.setInt(1, personId.id),
- (personId,rSet,exc,resultStream) -> {
- if (exc != null) {
- // some failure processing this tuple. an error was logged.
- System.err.println("Unable to process id="+personId+": "+exc);
- return;
- }
- if (rSet.next()) {
- resultStream.accept(
- new Person(rSet.getInt("id"),
- rSet.getString("firstname"),
- rSet.getString("lastname")));
- }
- else {
- System.err.println("Unknown person id="+personId.id);
- }
- }
- );
-
- // print out Person tuples as they are retrieved
- persons.sink(person -> System.out.println("retrieved person: "+person));
-
- // run the application / topology
- tp.submit(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
deleted file mode 100644
index 018c97b..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.jdbc;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Properties;
-
-import org.apache.edgent.connectors.jdbc.JdbcStreams;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-/**
- * A simple JDBC connector sample demonstrating streaming write access
- * of a dbms to add stream tuples to a table.
- */
-public class SimpleWriterApp {
- private final Properties props;
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1)
- throw new Exception("missing pathname to jdbc.properties file");
- SimpleWriterApp writer = new SimpleWriterApp(args[0]);
- DbUtils.initDb(DbUtils.getDataSource(writer.props));
- writer.run();
- }
-
- /**
- * @param jdbcPropsPath pathname to properties file
- */
- SimpleWriterApp(String jdbcPropsPath) throws Exception {
- props = new Properties();
- props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
- }
-
- /**
- * Create a topology for the writer application and run it.
- */
- private void run() throws Exception {
- DirectProvider tp = new DirectProvider();
-
- // build the application/topology
-
- Topology t = tp.newTopology("jdbcSampleWriter");
-
- // Create the JDBC connector
- JdbcStreams myDb = new JdbcStreams(t,
- () -> DbUtils.getDataSource(props),
- dataSource -> dataSource.getConnection());
-
- // Create a sample stream of Person tuples
- TStream<Person> persons = t.collection(PersonData.loadPersonData(props));
-
- // Write stream tuples to a table.
- myDb.executeStatement(persons,
- () -> "INSERT INTO persons VALUES(?,?,?)",
- (person,stmt) -> {
- System.out.println("Inserting into persons table: person "+person);
- stmt.setInt(1, person.id);
- stmt.setString(2, person.firstName);
- stmt.setString(3, person.lastName);
- }
- );
-
- // run the application / topology
- tp.submit(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
deleted file mode 100644
index 4e88b77..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Samples showing use of the
- * <a href="{@docRoot}/org/apache/edgent/connectors/jdbc/package-summary.html">
- * JDBC stream connector</a>.
- * <p>
- * See <edgent-release>/scripts/connectors/jdbc/README to run the samples.
- * <p>
- * The following samples are provided:
- * <ul>
- * <li>SimpleReaderApp.java - a simple dbms reader application topology</li>
- * <li>SimpleWriterApp.java - a simple dbms writer application topology</li>
- * </ul>
- */
-package org.apache.edgent.samples.connectors.jdbc;
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
deleted file mode 100644
index 7d5a530..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import org.apache.edgent.samples.connectors.Options;
-
-/**
- * Demonstrate integrating with the Apache Kafka messaging system
- * <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
- * <p>
- * {@link org.apache.edgent.connectors.kafka.KafkaProducer KafkaProducer} is
- * a connector used to create a bridge between topology streams
- * and publishing to Kafka topics.
- * <p>
- * {@link org.apache.edgent.connectors.kafka.KafkaConsumer KafkaConsumer} is
- * a connector used to create a bridge between topology streams
- * and subscribing to Kafka topics.
- * <p>
- * The client either publishes messages to a topic or
- * subscribes to the topic and reports the messages received.
- * <p>
- * By default, a running Kafka cluster with the following
- * characteristics is assumed:
- * <ul>
- * <li>{@code bootstrap.servers="localhost:9092"}</li>
- * <li>{@code zookeeper.connect="localhost:2181"}</li>
- * <li>kafka topic {@code "kafkaSampleTopic"} exists</li>
- * </ul>
- * <p>
- * See the Apache Kafka link above for information about setting up a Kafka
- * cluster as well as creating a topic.
- * <p>
- * This may be executed from as:
- * <UL>
- * <LI>
- * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.kafka.jar
- * org.apache.edgent.samples.connectors.kafka.KafkaClient -h
- * } - Run directly from the command line.
- * </LI>
- * </UL>
- * <UL>
- * <LI>
- * An application execution within your IDE once you set the class path to include the correct jars.</LI>
- * </UL>
- */
-public class KafkaClient {
- private static final String usage = "usage: "
- + "\n" + "[-v] [-h]"
- + "\n" + "pub | sub"
- + "\n" + "[bootstrap.servers=<value>]"
- + "\n" + "[zookeeper.connect=<value>]"
- + "\n" + "[group.id=<value>]"
- + "\n" + "[pubcnt=<value>]"
- ;
-
- public static void main(String[] args) throws Exception {
- Options options = processArgs(args);
- if (options == null)
- return;
-
- Runner.run(options);
- }
-
- private static Options processArgs(String[] args) {
- Options options = new Options();
- initHandlers(options);
- try {
- options.processArgs(args);
- }
- catch (Exception e) {
- System.err.println(e);
- System.out.println(usage);
- return null;
- }
-
- if ((Boolean)options.get(OPT_HELP)) {
- System.out.println(usage);
- return null;
- }
-
- if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) {
- System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB));
- System.out.println(usage);
- return null;
- }
-
- String[] announceOpts = new String[] {
- };
- if ((Boolean)options.get(OPT_VERBOSE))
- announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new);
- for (String opt : announceOpts) {
- Object value = options.get(opt);
- if (value != null) {
- if (opt.toLowerCase().contains("password"))
- value = "*****";
- System.out.println("Using "+opt+"="+value);
- }
- }
-
- return options;
- }
-
- static final String OPT_VERBOSE = "-v";
- static final String OPT_HELP = "-h";
- static final String OPT_PUB = "pub";
- static final String OPT_SUB = "sub";
- static final String OPT_BOOTSTRAP_SERVERS = "bootstrap.servers";
- static final String OPT_ZOOKEEPER_CONNECT = "zookeeper.connect";
- static final String OPT_GROUP_ID = "group.id";
- static final String OPT_TOPIC = "topic";
- static final String OPT_PUB_CNT = "pubcnt";
-
- private static void initHandlers(Options opts) {
- // options for which we have a default
- opts.addHandler(OPT_HELP, null, false);
- opts.addHandler(OPT_VERBOSE, null, false);
- opts.addHandler(OPT_PUB, null, false);
- opts.addHandler(OPT_SUB, null, false);
- opts.addHandler(OPT_BOOTSTRAP_SERVERS, v -> v, "localhost:9092");
- opts.addHandler(OPT_ZOOKEEPER_CONNECT, v -> v, "localhost:2181");
- opts.addHandler(OPT_TOPIC, v -> v, "kafkaSampleTopic");
- opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1);
-
- // optional options (no default value)
- opts.addHandler(OPT_GROUP_ID, v -> v);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
deleted file mode 100644
index 6746a7d..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB_CNT;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.edgent.samples.connectors.MsgSupplier;
-import org.apache.edgent.samples.connectors.Options;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-import org.apache.edgent.topology.TopologyProvider;
-
-import org.apache.edgent.connectors.kafka.KafkaProducer;
-
-/**
- * A Kafka producer/publisher topology application.
- */
-public class PublisherApp {
- private final TopologyProvider tp;
- private final Options options;
-
- /**
- * @param tp the TopologyProvider to use.
- * @param options
- */
- PublisherApp(TopologyProvider tp, Options options) {
- this.tp = tp;
- this.options = options;
- }
-
- /**
- * Create a topology for the publisher application.
- * @return the Topology
- */
- public Topology buildAppTopology() {
- Topology t = tp.newTopology("kafkaClientPublisher");
-
- // Create a sample stream of tuples to publish
- TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)),
- 1L, TimeUnit.SECONDS);
-
- // Create the KafkaProducer broker connector
- Map<String,Object> config = newConfig();
- KafkaProducer kafka = new KafkaProducer(t, () -> config);
-
- // Publish the stream to the topic. The String tuple is the message value.
- kafka.publish(msgs, options.get(OPT_TOPIC));
-
- return t;
- }
-
- private Map<String,Object> newConfig() {
- Map<String,Object> config = new HashMap<>();
- // required kafka configuration items
- config.put("bootstrap.servers", options.get(OPT_BOOTSTRAP_SERVERS));
- return config;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
deleted file mode 100644
index 6554f8b..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
+++ /dev/null
@@ -1,26 +0,0 @@
-Sample Kafka Publisher and Subscriber topology applications.
-
-By default the samples assume the following kafka broker configuration:
-- bootstrap.servers="localhost:9092"
-- zookeeper.connect="localhost:2181"
-- kafka topic "kafkaSampleTopic" exists
-- no authentication
-
-See http://kafka.apache.org for the code and setup information for
-a Kafka broker.
-
-see scripts/connectors/kafka/README to run them
-
-The simple sample
------------------
-
-SimplePublisherApp.java - build and run the simple publisher application topology
-SimpleSubscriberApp.java - build and run the simple subscriber application topology
-
-The fully configurable client
------------------------------
-
-Runner.java - build and run the publisher or subscriber
-PublisherApp.java - build the publisher application topology
-SubscriberApp.java - build the subscriber application topology
-KafkaClient.java - the client's command line interface
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
deleted file mode 100644
index 2ffccfc..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT;
-
-import org.apache.edgent.console.server.HttpServer;
-import org.apache.edgent.providers.development.DevelopmentProvider;
-import org.apache.edgent.samples.connectors.Options;
-import org.apache.edgent.topology.Topology;
-
-/**
- * Build and run the publisher or subscriber application.
- */
-public class Runner {
- /**
- * Build and run the publisher or subscriber application.
- * @param options command line options
- * @throws Exception on failure
- */
- public static void run(Options options) throws Exception {
- boolean isPub = options.get(OPT_PUB);
-
- // Get a topology runtime provider
- DevelopmentProvider tp = new DevelopmentProvider();
-
- Topology top;
- if (isPub) {
- PublisherApp publisher = new PublisherApp(tp, options);
- top = publisher.buildAppTopology();
- }
- else {
- SubscriberApp subscriber = new SubscriberApp(tp, options);
- top = subscriber.buildAppTopology();
- }
-
- // Submit the app/topology; send or receive the messages.
- System.out.println(
- "Using Kafka cluster at bootstrap.servers="
- + options.get(OPT_BOOTSTRAP_SERVERS)
- + " zookeeper.connect=" + options.get(OPT_ZOOKEEPER_CONNECT)
- + "\n" + (isPub ? "Publishing" : "Subscribing")
- + " to topic " + options.get(OPT_TOPIC));
- System.out.println("Console URL for the job: "
- + tp.getServices().getService(HttpServer.class).getConsoleUrl());
- tp.submit(top);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
deleted file mode 100644
index a8b9492..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.edgent.console.server.HttpServer;
-import org.apache.edgent.providers.development.DevelopmentProvider;
-import org.apache.edgent.samples.connectors.Util;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-import org.apache.edgent.connectors.kafka.KafkaProducer;
-
-/**
- * A simple Kafka publisher topology application.
- */
-public class SimplePublisherApp {
- private final Properties props;
- private final String topic;
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1)
- throw new Exception("missing pathname to kafka.properties file");
- SimplePublisherApp publisher = new SimplePublisherApp(args[0]);
- publisher.run();
- }
-
- /**
- * @param kafkaPropsPath pathname to properties file
- */
- SimplePublisherApp(String kafkaPropsPath) throws Exception {
- props = new Properties();
- props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
- topic = props.getProperty("topic");
- }
-
- private Map<String,Object> createKafkaConfig() {
- Map<String,Object> kafkaConfig = new HashMap<>();
- kafkaConfig.put("bootstrap.servers", props.get("bootstrap.servers"));
- return kafkaConfig;
- }
-
- /**
- * Create a topology for the publisher application and run it.
- */
- private void run() throws Exception {
- DevelopmentProvider tp = new DevelopmentProvider();
-
- // build the application/topology
-
- Topology t = tp.newTopology("kafkaSamplePublisher");
-
- // Create the Kafka Producer broker connector
- Map<String,Object> kafkaConfig = createKafkaConfig();
- KafkaProducer kafka = new KafkaProducer(t, () -> kafkaConfig);
-
- // Create a sample stream of tuples to publish
- AtomicInteger cnt = new AtomicInteger();
- TStream<String> msgs = t.poll(
- () -> {
- String msg = String.format("Message-%d from %s",
- cnt.incrementAndGet(), Util.simpleTS());
- System.out.println("poll generated msg to publish: " + msg);
- return msg;
- }, 1L, TimeUnit.SECONDS);
-
- // Publish the stream to the topic. The String tuple is the message value.
- kafka.publish(msgs, topic);
-
- // run the application / topology
- System.out.println("Console URL for the job: "
- + tp.getServices().getService(HttpServer.class).getConsoleUrl());
- tp.submit(t);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
deleted file mode 100644
index 7cef424..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.edgent.console.server.HttpServer;
-import org.apache.edgent.providers.development.DevelopmentProvider;
-import org.apache.edgent.samples.connectors.Util;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-
-import org.apache.edgent.connectors.kafka.KafkaConsumer;
-
-/**
- * A simple Kafka subscriber topology application.
- */
-public class SimpleSubscriberApp {
- private final Properties props;
- private final String topic;
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1)
- throw new Exception("missing pathname to kafka.properties file");
- SimpleSubscriberApp subscriber = new SimpleSubscriberApp(args[0]);
- subscriber.run();
- }
-
- /**
- * @param kafkaPropsPath pathname to properties file
- */
- SimpleSubscriberApp(String kafkaPropsPath) throws Exception {
- props = new Properties();
- props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
- topic = props.getProperty("topic");
- }
-
- private Map<String,Object> createKafkaConfig() {
- Map<String,Object> kafkaConfig = new HashMap<>();
- kafkaConfig.put("zookeeper.connect", props.get("zookeeper.connect"));
- // for the sample, be insensitive to old/multiple consumers for
- // the topic/groupId hanging around
- kafkaConfig.put("group.id",
- "kafkaSampleConsumer_" + Util.simpleTS().replaceAll(":", ""));
- return kafkaConfig;
- }
-
- /**
- * Create a topology for the subscriber application and run it.
- */
- private void run() throws Exception {
- DevelopmentProvider tp = new DevelopmentProvider();
-
- // build the application/topology
-
- Topology t = tp.newTopology("kafkaSampleSubscriber");
-
- // Create the Kafka Consumer broker connector
- Map<String,Object> kafkaConfig = createKafkaConfig();
- KafkaConsumer kafka = new KafkaConsumer(t, () -> kafkaConfig);
-
- // Subscribe to the topic and create a stream of messages
- TStream<String> msgs = kafka.subscribe(rec -> rec.value(), topic);
-
- // Process the received msgs - just print them out
- msgs.sink(tuple -> System.out.println(
- String.format("[%s] received: %s", Util.simpleTS(), tuple)));
-
- // run the application / topology
- System.out.println("Console URL for the job: "
- + tp.getServices().getService(HttpServer.class).getConsoleUrl());
- tp.submit(t);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java
deleted file mode 100644
index 7405f39..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.kafka;
-
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_GROUP_ID;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
-import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.edgent.samples.connectors.Options;
-import org.apache.edgent.samples.connectors.Util;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-import org.apache.edgent.topology.TopologyProvider;
-
-import org.apache.edgent.connectors.kafka.KafkaConsumer;
-
-/**
- * A Kafka consumer/subscriber topology application.
- */
-public class SubscriberApp {
- private final TopologyProvider tp;
- private final Options options;
- private final String uniq = Util.simpleTS();
-
- /**
- * @param top the TopologyProvider to use.
- * @param options
- */
- SubscriberApp(TopologyProvider tp, Options options) {
- this.tp = tp;
- this.options = options;
- }
-
- /**
- * Create a topology for the subscriber application.
- * @return the Topology
- */
- public Topology buildAppTopology() {
- Topology t = tp.newTopology("kafkaClientSubscriber");
-
- // Create the KafkaConsumer broker connector
- Map<String,Object> config = newConfig(t);
- KafkaConsumer kafka = new KafkaConsumer(t, () -> config);
-
- System.out.println("Using Kafka consumer group.id "
- + config.get(OPT_GROUP_ID));
-
- // Subscribe to the topic and create a stream of messages
- TStream<String> msgs = kafka.subscribe(rec -> rec.value(),
- (String)options.get(OPT_TOPIC));
-
- // Process the received msgs - just print them out
- msgs.sink(tuple -> System.out.println(
- String.format("[%s] received: %s", Util.simpleTS(), tuple)));
-
- return t;
- }
-
- private Map<String,Object> newConfig(Topology t) {
- Map<String,Object> config = new HashMap<>();
- // required kafka configuration items
- config.put("zookeeper.connect", options.get(OPT_ZOOKEEPER_CONNECT));
- config.put("group.id", options.get(OPT_GROUP_ID, newGroupId(t.getName())));
- return config;
- }
-
- private String newGroupId(String name) {
- // be insensitive to old consumers for the topic/groupId hanging around
- String groupId = name + "_" + uniq.replaceAll(":", "");
- return groupId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java
deleted file mode 100644
index 761d053..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Samples showing use of the
- * <a href="{@docRoot}/org/apache/edgent/connectors/kafka/package-summary.html">
- * Apache Kafka stream connector</a>.
- * <p>
- * See <edgent-release>/scripts/connectors/kafka/README to run the samples.
- * <p>
- * The following simple samples are provided:
- * <ul>
- * <li>SimplePublisherApp.java - a simple publisher application topology</li>
- * <li>SimpleSubscriberApp.java - a simple subscriber application topology</li>
- * </ul>
- * The remaining classes are part of a sample that more fully exposes
- * controlling various configuration options.
- */
-package org.apache.edgent.samples.connectors.kafka;
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java
deleted file mode 100644
index 9cf6c37..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.mqtt;
-
-import org.apache.edgent.samples.connectors.Options;
-
-/**
- * Demonstrate integrating with the MQTT messaging system
- * <a href="http://mqtt.org">http://mqtt.org</a>.
- * <p>
- * {@link org.apache.edgent.connectors.mqtt.MqttStreams MqttStreams} is
- * a connector used to create a bridge between topology streams
- * and an MQTT broker.
- * <p>
- * The client either publishes some messages to a MQTT topic
- * or subscribes to the topic and reports the messages received.
- * <p>
- * By default, a running MQTT broker with the following
- * characteristics is assumed:
- * <ul>
- * <li>the broker's connection is {@code tcp://localhost:1883}</li>
- * <li>the broker is configured for no authentication</li>
- * </ul>
- * <p>
- * See the MQTT link above for information about setting up a MQTT broker.
- * <p>
- * This may be executed as:
- * <UL>
- * <LI>
- * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.mqtt.jar
- * org.apache.edgent.samples.connectors.mqtt.MqttClient -h
- * } - Run directly from the command line.
- * </LI>
- * <LI>
- * Specify absolute pathnames if using the {@code trustStore}
- * or {@code keyStore} arguments.
- * </LI>
- * <LI>
- * An application execution within your IDE once you set the class path to include the correct jars.
- * </LI>
- * </UL>
- */
-public class MqttClient {
- private static final String usage = "usage: "
- + "\n" + "[-v] [-h]"
- + "\n" + "pub | sub"
- + "\n" + "[serverURI=<value>]"
- + "\n" + "[clientId=<value>]"
- + "\n" + "[cleanSession=<true|false>]"
- + "\n" + "[topic=<value>] [qos=<value>]"
- + "\n" + "[retain]"
- + "\n" + "[pubcnt=<value>]"
- + "\n" + "[cnTimeout=<value>]"
- + "\n" + "[actionTimeoutMillis=<value>]"
- + "\n" + "[idleTimeout=<value>]"
- + "\n" + "[idleReconnectInterval=<value>]"
- + "\n" + "[userID=<value>] [password=<value>]"
- + "\n" + "[trustStore=<value>] [trustStorePassword=<value>]"
- + "\n" + "[keyStore=<value>] [keyStorePassword=<value>]"
- ;
-
- public static void main(String[] args) throws Exception {
- Options options = processArgs(args);
- if (options == null)
- return;
-
- Runner.run(options);
- }
-
- private static Options processArgs(String[] args) {
- Options options = new Options();
- initHandlers(options);
- try {
- options.processArgs(args);
- }
- catch (Exception e) {
- System.err.println(e);
- System.out.println(usage);
- return null;
- }
-
- if ((Boolean)options.get(OPT_HELP)) {
- System.out.println(usage);
- return null;
- }
-
- if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) {
- System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB));
- System.out.println(usage);
- return null;
- }
-
- if (options.get(OPT_PASSWORD) != null)
- options.put(OPT_USER_ID, options.get(OPT_USER_ID, System.getProperty("user.name")));
-
- String[] announceOpts = new String[] {
- OPT_USER_ID,
- OPT_PASSWORD,
- OPT_TRUST_STORE,
- OPT_TRUST_STORE_PASSWORD,
- OPT_KEY_STORE,
- OPT_KEY_STORE_PASSWORD
- };
- if ((Boolean)options.get(OPT_VERBOSE))
- announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new);
- for (String opt : announceOpts) {
- Object value = options.get(opt);
- if (value != null) {
- if (opt.toLowerCase().contains("password"))
- value = "*****";
- System.out.println("Using "+opt+"="+value);
- }
- }
-
- return options;
- }
-
- static final String OPT_VERBOSE = "-v";
- static final String OPT_HELP = "-h";
- static final String OPT_PUB = "pub";
- static final String OPT_SUB = "sub";
- static final String OPT_SERVER_URI = "serverURI";
- static final String OPT_CLIENT_ID = "clientId";
- static final String OPT_CN_TIMEOUT_SEC = "cnTimeout";
- static final String OPT_ACTION_TIMEOUT_MILLIS = "actionTimeoutMillis";
- static final String OPT_QOS = "qos";
- static final String OPT_TOPIC = "topic";
- static final String OPT_CLEAN_SESSION = "cleanSession";
- static final String OPT_RETAIN = "retain";
- static final String OPT_USER_ID = "userID";
- static final String OPT_PASSWORD = "password";
- static final String OPT_TRUST_STORE = "trustStore";
- static final String OPT_TRUST_STORE_PASSWORD = "trustStorePassword";
- static final String OPT_KEY_STORE = "keyStore";
- static final String OPT_KEY_STORE_PASSWORD = "keyStorePassword";
- static final String OPT_PUB_CNT = "pubcnt";
- static final String OPT_IDLE_TIMEOUT_SEC = "idleTimeout";
- static final String OPT_IDLE_RECONNECT_INTERVAL_SEC = "idleReconnectInterval";
-
- private static void initHandlers(Options opts) {
- // options for which we have a default
- opts.addHandler(OPT_HELP, null, false);
- opts.addHandler(OPT_VERBOSE, null, false);
- opts.addHandler(OPT_PUB, null, false);
- opts.addHandler(OPT_SUB, null, false);
- opts.addHandler(OPT_SERVER_URI, v -> v, "tcp://localhost:1883");
- opts.addHandler(OPT_TOPIC, v -> v, "mqttSampleTopic");
- opts.addHandler(OPT_RETAIN, null, false);
- opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1);
- opts.addHandler(OPT_QOS, v -> Integer.valueOf(v), 0);
-
- // optional options (no default value)
- opts.addHandler(OPT_CLIENT_ID, v -> v);
- opts.addHandler(OPT_CN_TIMEOUT_SEC, v -> Integer.valueOf(v));
- opts.addHandler(OPT_ACTION_TIMEOUT_MILLIS, v -> Long.valueOf(v));
- opts.addHandler(OPT_CLEAN_SESSION, v -> Boolean.valueOf(v));
- opts.addHandler(OPT_USER_ID, v -> v);
- opts.addHandler(OPT_PASSWORD, v -> v);
- opts.addHandler(OPT_TRUST_STORE, v -> v);
- opts.addHandler(OPT_TRUST_STORE_PASSWORD, v -> v);
- opts.addHandler(OPT_KEY_STORE, v -> v);
- opts.addHandler(OPT_KEY_STORE_PASSWORD, v -> v);
- opts.addHandler(OPT_IDLE_TIMEOUT_SEC, v -> Integer.valueOf(v));
- opts.addHandler(OPT_IDLE_RECONNECT_INTERVAL_SEC, v -> Integer.valueOf(v));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java
deleted file mode 100644
index 4be6ce5..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.samples.connectors.mqtt;
-
-import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_PUB_CNT;
-import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_QOS;
-import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_RETAIN;
-import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_TOPIC;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.edgent.connectors.mqtt.MqttConfig;
-import org.apache.edgent.connectors.mqtt.MqttStreams;
-import org.apache.edgent.samples.connectors.MsgSupplier;
-import org.apache.edgent.samples.connectors.Options;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-import org.apache.edgent.topology.TopologyProvider;
-
-/**
- * A MQTT publisher topology application.
- */
-public class PublisherApp {
- private final TopologyProvider tp;
- private final Options options;
-
- /**
- * @param tp the TopologyProvider to use.
- * @param options
- */
- PublisherApp(TopologyProvider tp, Options options) {
- this.tp = tp;
- this.options = options;
- }
-
- /**
- * Create a topology for the publisher application.
- * @return the Topology
- */
- public Topology buildAppTopology() {
- Topology t = tp.newTopology("mqttClientPublisher");
-
- // Create a sample stream of tuples to publish
- TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)),
- 1L, TimeUnit.SECONDS);
-
- // Create the MQTT broker connector
- MqttConfig config= Runner.newConfig(options);
- MqttStreams mqtt = new MqttStreams(t, () -> config);
-
- // Publish the stream to the topic. The String tuple is the message value.
- mqtt.publish(msgs, options.get(OPT_TOPIC),
- options.get(OPT_QOS), options.get(OPT_RETAIN));
-
- return t;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README
deleted file mode 100644
index 7760f50..0000000
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README
+++ /dev/null
@@ -1,24 +0,0 @@
-Sample MQTT Publisher and Subscriber topology applications.
-
-By default, the following MQTT broker configuration is assumed:
-- the broker's connection URL is tcp://localhost:1883
-- the broker is configured for no authentication
-
-See http://mqtt.org for the code and setup information for
-a mqtt broker.
-
-see scripts/connectors/mqtt/README to run them
-
-The simple sample
------------------
-
-SimplePublisherApp.java - build and run the simple publisher application topology
-SimpleSubscriberApp.java - build and run the simple subscriber application topology
-
-The fully configurable clients
-------------------------------
-
-Runner.java - build and run the publisher or subscriber
-PublisherApp.java - build the publisher application topology
-SubscriberApp.java - build the subscriber application topology
-MqttClient.java - the client's command line interface