You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2017/02/21 14:59:26 UTC
[1/4] incubator-edgent git commit: [Edgent-375] Add
IotpDevice.httpEvents(...)
Repository: incubator-edgent
Updated Branches:
refs/heads/master a2a29a9d7 -> b1b433716
[Edgent-375] Add IotpDevice.httpEvents(...)
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/ede60042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/ede60042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/ede60042
Branch: refs/heads/master
Commit: ede6004229886ff1ddf40db449076acb5794657a
Parents: 261c7ee
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Feb 20 17:59:26 2017 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Feb 20 17:59:26 2017 -0500
----------------------------------------------------------------------
.../edgent/connectors/iotp/IotpDevice.java | 39 +++++++++++++++
.../connectors/iotp/runtime/IotpConnector.java | 23 ++++++++-
.../iotp/runtime/IotpDeviceHttpEventsFixed.java | 44 +++++++++++++++++
.../runtime/IotpDeviceHttpEventsFunction.java | 51 ++++++++++++++++++++
.../connectors/iotp/IotpQuickstart2.java | 21 +++++---
5 files changed, 171 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
index 007d883..f3a61fe 100644
--- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
@@ -31,6 +31,8 @@ import org.apache.edgent.connectors.iotp.runtime.IotpConnector;
import org.apache.edgent.connectors.iotp.runtime.IotpDeviceCommands;
import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFixed;
import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFunction;
+import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFixed;
+import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.UnaryOperator;
import org.apache.edgent.topology.TSink;
@@ -252,6 +254,43 @@ public class IotpDevice implements IotDevice {
}
/**
+ * Publish a stream's tuples as device events using the WIoTP HTTP protocol.
+ * <p>
+ * Each tuple is published as a device event with the supplied functions
+ * providing the event identifier and payload from the tuple.
+ * The event identifier and payload can be generated based upon the tuple.
+ * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
+ *
+ * @param stream
+ * Stream to be published.
+ * @param eventId
+ * function to supply the event identifier.
+ * @param payload
+ * function to supply the event's payload.
+ * @return TSink sink element representing termination of this stream.
+ */
+ public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload) {
+ return stream.sink(new IotpDeviceHttpEventsFunction(connector, eventId, payload));
+ }
+
+ /**
+ * Publish a stream's tuples as device events using the WIoTP HTTP protocol.
+ * <p>
+ * Each tuple is published as a device event with the fixed event identifier.
+ * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
+ *
+ * @param stream
+ * Stream to be published.
+ * @param eventId
+ * Event identifier.
+ * @return TSink sink element representing termination of this stream.
+ */
+ public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, String eventId) {
+ return stream.sink(new IotpDeviceHttpEventsFixed(connector, eventId));
+ }
+
+ /**
* Create a stream of device commands as JSON objects.
* Each command sent to the device matching {@code commands} will result in a tuple
* on the stream. The JSON object has these keys:
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
index af16e61..998222f 100644
--- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
@@ -24,8 +24,11 @@ import java.io.Serializable;
import java.util.Properties;
import org.apache.edgent.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
+import com.ibm.iotf.client.api.APIClient.ContentType;
import com.ibm.iotf.client.device.Command;
import com.ibm.iotf.client.device.DeviceClient;
@@ -34,6 +37,7 @@ import com.ibm.iotf.client.device.DeviceClient;
*/
public class IotpConnector implements Serializable, AutoCloseable {
private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(IotpConnector.class);
private Properties options;
private File optionsFile;
@@ -98,7 +102,24 @@ public class IotpConnector implements Serializable, AutoCloseable {
throw new RuntimeException(e);
}
- client.publishEvent(eventId, event, qos);
+ boolean success = client.publishEvent(eventId, event, qos);
+ if (!success) {
+ // TODO log
+ }
+ }
+
+ void publishHttpEvent(String eventId, JsonObject event) {
+ try {
+ DeviceClient client = getClient();
+ client.api().publishDeviceEventOverHTTP(eventId, event, ContentType.json);
+ } catch (Exception e) {
+ // throw new RuntimeException(e);
+ // If the publish throws, a RuntimeException will cause
+ // everything to unwind and the app/topology can terminate.
+ // See the commentary/impl of MqttPublisher.accept().
+ // See EDGENT-382
+ logger.error("Unable to publish tuple for event " + eventId, e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
new file mode 100644
index 0000000..ea640b9
--- /dev/null
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iotp.runtime;
+
+import org.apache.edgent.function.Consumer;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Consumer that publishes stream tuples as IoTf device events.
+ *
+ */
+public class IotpDeviceHttpEventsFixed implements Consumer<JsonObject> {
+ private static final long serialVersionUID = 1L;
+ private final IotpConnector connector;
+ private final String eventId;
+
+ public IotpDeviceHttpEventsFixed(IotpConnector connector, String eventId) {
+ this.connector = connector;
+ this.eventId = eventId;
+ }
+
+ @Override
+ public void accept(JsonObject event) {
+ connector.publishHttpEvent(eventId, event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
new file mode 100644
index 0000000..5470bf3
--- /dev/null
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iotp.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Consumer that publishes stream tuples as IoTf device events with a fixed
+ * event identifier and qos.
+ *
+ */
+public class IotpDeviceHttpEventsFunction implements Consumer<JsonObject> {
+
+ private static final long serialVersionUID = 1L;
+ private final IotpConnector connector;
+ private final Function<JsonObject, String> eventId;
+ private UnaryOperator<JsonObject> payload;
+
+ public IotpDeviceHttpEventsFunction(IotpConnector connector, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload) {
+ this.connector = connector;
+ this.payload = payload;
+ this.eventId = eventId;
+ }
+
+ @Override
+ public void accept(JsonObject event) {
+ connector.publishHttpEvent(eventId.apply(event), payload.apply(event));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
index eff81c2..9a41601 100644
--- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.edgent.samples.connectors.iotp;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.edgent.connectors.iot.IotDevice;
import org.apache.edgent.connectors.iot.QoS;
import org.apache.edgent.connectors.iotp.IotpDevice;
import org.apache.edgent.providers.direct.DirectProvider;
@@ -49,12 +50,14 @@ import com.ibm.iotf.devicemgmt.device.ManagedDevice;
* as it received by the Quickstart service.
* <P>
* This sample demonstrates using the WIoTP API to initialize the IotpDevice
- * connector.
+ * connector as well as the ability to publish events using the WIoTP HTTP protocol.
*/
public class IotpQuickstart2 {
public static void main(String[] args) throws Exception {
- boolean useDeviceClient = args.length > 0 && args[0].equals("useDeviceClient");
+ List<String> argList = Arrays.asList(args);
+ boolean useDeviceClient = argList.contains("useDeviceClient");
+ boolean useHttp = argList.contains("useHttp");
DirectProvider tp = new DirectProvider();
Topology topology = tp.newTopology("IotpQuickstart");
@@ -65,7 +68,7 @@ public class IotpQuickstart2 {
options.setProperty("org", "quickstart");
options.setProperty("type", IotpDevice.QUICKSTART_DEVICE_TYPE);
options.setProperty("id", deviceId);
- IotDevice device;
+ IotpDevice device;
if (useDeviceClient) {
System.out.println("Using WIoTP DeviceClient");
device = new IotpDevice(topology, new DeviceClient(options));
@@ -99,8 +102,14 @@ public class IotpQuickstart2 {
j.addProperty("objectTemp", v[2]);
return j;
});
-
- device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+
+ if (!useHttp) {
+ device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+ }
+ else {
+ System.out.println("Publishing events using HTTP");
+ device.httpEvents(json, "sensors");
+ }
tp.submit(topology);
}
[3/4] incubator-edgent git commit: add logging for some other publish
failure conditions
Posted by dl...@apache.org.
add logging for some other publish failure conditions
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/56b9716e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/56b9716e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/56b9716e
Branch: refs/heads/master
Commit: 56b9716e50febac9cdf80331f7810b464326ce0e
Parents: 983a53b
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Feb 21 08:41:56 2017 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue Feb 21 08:41:56 2017 -0500
----------------------------------------------------------------------
.../edgent/connectors/iotp/runtime/IotpConnector.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/56b9716e/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
index 7ece528..6639c40 100644
--- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
+import com.ibm.iotf.client.api.APIClient;
import com.ibm.iotf.client.api.APIClient.ContentType;
import com.ibm.iotf.client.device.Command;
import com.ibm.iotf.client.device.DeviceClient;
@@ -102,20 +103,24 @@ public class IotpConnector implements Serializable, AutoCloseable {
throw new RuntimeException(e);
}
- client.publishEvent(eventId, event, qos);
+ if (!client.publishEvent(eventId, event, qos)) {
+ logger.error("Publish event failed for eventId {}", eventId);
+ }
}
void publishHttpEvent(String eventId, JsonObject event) {
try {
- DeviceClient client = getClient();
- client.api().publishDeviceEventOverHTTP(eventId, event, ContentType.json);
+ APIClient api = getClient().api();
+ if (!api.publishDeviceEventOverHTTP(eventId, event, ContentType.json)) {
+ logger.error("HTTP publish event failed for eventId {}", eventId);
+ }
} catch (Exception e) {
// throw new RuntimeException(e);
// If the publish throws, a RuntimeException will cause
// everything to unwind and the app/topology can terminate.
// See the commentary/impl of MqttPublisher.accept().
// See EDGENT-382
- logger.error("Unable to publish tuple for event " + eventId, e);
+ logger.error("Unable to publish event for eventId {}", eventId, e);
}
}
[2/4] incubator-edgent git commit: undo inadvertent change
Posted by dl...@apache.org.
undo inadvertent change
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/983a53b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/983a53b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/983a53b4
Branch: refs/heads/master
Commit: 983a53b4f20fafd385d6b4924032655ac053f8c1
Parents: ede6004
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Feb 20 18:02:07 2017 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Feb 20 18:02:07 2017 -0500
----------------------------------------------------------------------
.../apache/edgent/connectors/iotp/runtime/IotpConnector.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/983a53b4/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
----------------------------------------------------------------------
diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
index 998222f..7ece528 100644
--- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
+++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
@@ -102,10 +102,7 @@ public class IotpConnector implements Serializable, AutoCloseable {
throw new RuntimeException(e);
}
- boolean success = client.publishEvent(eventId, event, qos);
- if (!success) {
- // TODO log
- }
+ client.publishEvent(eventId, event, qos);
}
void publishHttpEvent(String eventId, JsonObject event) {
[4/4] incubator-edgent git commit: Merge pull request #286
Posted by dl...@apache.org.
Merge pull request #286
This closes #286
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/b1b43371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/b1b43371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/b1b43371
Branch: refs/heads/master
Commit: b1b4337167570159e1be900405e2ec7a95a94f6c
Parents: a2a29a9 56b9716
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Feb 21 09:59:16 2017 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue Feb 21 09:59:16 2017 -0500
----------------------------------------------------------------------
.../edgent/connectors/iotp/IotpDevice.java | 39 +++++++++++++++
.../connectors/iotp/runtime/IotpConnector.java | 25 +++++++++-
.../iotp/runtime/IotpDeviceHttpEventsFixed.java | 44 +++++++++++++++++
.../runtime/IotpDeviceHttpEventsFunction.java | 51 ++++++++++++++++++++
.../connectors/iotp/IotpQuickstart2.java | 21 +++++---
5 files changed, 173 insertions(+), 7 deletions(-)
----------------------------------------------------------------------