You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/10 07:16:32 UTC
[incubator-streampipes-extensions] branch dev updated: Add ISS
adapter
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a123d6 Add ISS adapter
new 0d07e07 Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
9a123d6 is described below
commit 9a123d6de6de916797dbf98bacab16f42675f281
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Mar 10 08:15:58 2020 +0100
Add ISS adapter
---
pom.xml | 10 +-
.../streampipes/connect/ConnectAdapterInit.java | 4 +-
.../connect/adapters/iss/IssAdapter.java | 147 +++++++++++++++++++++
.../connect/adapters/iss/model/IssModel.java | 42 ++++++
.../connect/adapters/iss/model/IssPosition.java | 33 +++++
.../documentation.md | 0
.../strings.en | 5 +
.../strings.en | 4 +-
.../documentation.md | 16 +--
9 files changed, 239 insertions(+), 22 deletions(-)
diff --git a/pom.xml b/pom.xml
index ab3350b..3faa4e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -554,11 +554,11 @@
</rules>
</configuration>
</plugin>
- <!-- <plugin>-->
- <!-- <groupId>org.apache.streampipes</groupId>-->
- <!-- <artifactId>streampipes-maven-plugin</artifactId>-->
- <!-- <version>1.0-SNAPSHOT</version>-->
- <!-- </plugin>-->
+ <plugin>
+ <groupId>org.streampipes</groupId>
+ <artifactId>streampipes-maven-plugin</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index b3e2e91..ff9e279 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect;
+import org.apache.streampipes.connect.adapters.iss.IssAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -84,7 +85,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
.add(new TISensorTag())
.add(new NetioRestAdapter())
.add(new NetioMQTTAdapter())
- .add(new Plc4xS7Adapter());
+ .add(new Plc4xS7Adapter())
+ .add(new IssAdapter());
String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
String masterUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerMasterUrl();
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/IssAdapter.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/IssAdapter.java
new file mode 100644
index 0000000..51a2985
--- /dev/null
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/IssAdapter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.connect.adapters.iss;
+
+import static org.apache.streampipes.sdk.helpers.EpProperties.doubleEp;
+import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
+
+import com.google.gson.Gson;
+import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.connect.adapter.util.PollingSettings;
+import org.apache.streampipes.connect.adapters.PullAdapter;
+import org.apache.streampipes.connect.adapters.iss.model.IssModel;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.Geo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class IssAdapter extends PullAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IssAdapter.class);
+
+ public static final String ID = "org.apache.streampipes.connect.adapters.iss";
+
+ private static final String POLLING_INTERVAL_KEY = "polling-interval";
+ private static final String ISS_ENDPOINT_URL = "http://api.open-notify.org/iss-now.json";
+
+ private static final String Timestamp = "timestamp";
+ private static final String Latitude = "latitude";
+ private static final String Longitude = "longitude";
+
+ private Integer pollingIntervalInSeconds;
+
+
+ public IssAdapter() {
+ super();
+ }
+
+ public IssAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+ super(adapterStreamDescription);
+ }
+
+ @Override
+ protected void before() throws AdapterException {
+ ParameterExtractor extractor = new ParameterExtractor(adapterDescription.getConfig());
+ this.pollingIntervalInSeconds = extractor.singleValue(POLLING_INTERVAL_KEY, Integer.class);
+ }
+
+ @Override
+ protected void pullData() {
+ try {
+ adapterPipeline.process(getNextPosition());
+ } catch (IOException e) {
+ LOG.error("Could not fetch ISS location data", e);
+ }
+ }
+
+ private Map<String, Object> getNextPosition() throws IOException {
+ String response = Request
+ .Get(ISS_ENDPOINT_URL)
+ .execute()
+ .returnContent()
+ .asString();
+
+ IssModel issModel = new Gson().fromJson(response, IssModel.class);
+
+ return asMap(issModel);
+ }
+
+ private Map<String, Object> asMap(IssModel issModel) {
+ Map<String, Object> event = new HashMap<>();
+ event.put(Timestamp, issModel.getTimestamp());
+ event.put(Latitude, issModel.getIssPosition().getLatitude());
+ event.put(Longitude, issModel.getIssPosition().getLongitude());
+
+ return event;
+ }
+
+ @Override
+ protected PollingSettings getPollingInterval() {
+ return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+ }
+
+ @Override
+ public SpecificAdapterStreamDescription declareModel() {
+ SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION)
+ .category(AdapterType.OpenData)
+ .requiredIntegerParameter(Labels.withId(POLLING_INTERVAL_KEY), 2)
+ .build();
+ description.setAppId(ID);
+
+ return description;
+ }
+
+ @Override
+ public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+ return new IssAdapter(adapterDescription);
+ }
+
+ @Override
+ public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
+ return GuessSchemaBuilder.create()
+ .property(timestampProperty(Timestamp))
+ .property(doubleEp(Labels.from(Latitude, "Latitude", "The latitude value of the current ISS location"),
+ Latitude, Geo.lat))
+ .property(doubleEp(Labels.from(Longitude, "Longitude", "The longitude value of the current ISS " +
+ "location"),
+ Longitude, Geo.lng))
+ .build();
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+}
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssModel.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssModel.java
new file mode 100644
index 0000000..3dc6cf2
--- /dev/null
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssModel.java
@@ -0,0 +1,42 @@
+
+package org.apache.streampipes.connect.adapters.iss.model;
+
+import javax.annotation.Generated;
+import com.google.gson.annotations.SerializedName;
+
+@Generated("net.hexar.json2pojo")
+@SuppressWarnings("unused")
+public class IssModel {
+
+ @SerializedName("iss_position")
+ private IssPosition mIssPosition;
+ @SerializedName("message")
+ private String mMessage;
+ @SerializedName("timestamp")
+ private Long mTimestamp;
+
+ public IssPosition getIssPosition() {
+ return mIssPosition;
+ }
+
+ public void setIssPosition(IssPosition issPosition) {
+ mIssPosition = issPosition;
+ }
+
+ public String getMessage() {
+ return mMessage;
+ }
+
+ public void setMessage(String message) {
+ mMessage = message;
+ }
+
+ public Long getTimestamp() {
+ return mTimestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ mTimestamp = timestamp;
+ }
+
+}
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssPosition.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssPosition.java
new file mode 100644
index 0000000..77ea656
--- /dev/null
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/iss/model/IssPosition.java
@@ -0,0 +1,33 @@
+
+package org.apache.streampipes.connect.adapters.iss.model;
+
+import com.google.gson.annotations.SerializedName;
+
+import javax.annotation.Generated;
+
+@Generated("net.hexar.json2pojo")
+@SuppressWarnings("unused")
+public class IssPosition {
+
+ @SerializedName("latitude")
+ private Double mLatitude;
+ @SerializedName("longitude")
+ private Double mLongitude;
+
+ public Double getLatitude() {
+ return mLatitude;
+ }
+
+ public void setLatitude(Double latitude) {
+ mLatitude = latitude;
+ }
+
+ public Double getLongitude() {
+ return mLongitude;
+ }
+
+ public void setLongitude(Double longitude) {
+ mLongitude = longitude;
+ }
+
+}
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.iss/documentation.md b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.iss/documentation.md
new file mode 100644
index 0000000..e69de29
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.iss/strings.en b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.iss/strings.en
new file mode 100644
index 0000000..a24cfd8
--- /dev/null
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.iss/strings.en
@@ -0,0 +1,5 @@
+org.apache.streampipes.connect.adapters.iss.title=ISS Location
+org.apache.streampipes.connect.adapters.iss.description=Current Location of the International Space Station (ISS)
+
+polling-interval.title=Polling interval
+polling-interval.description=The update interval in seconds
\ No newline at end of file
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.bufferrest/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.bufferrest/strings.en
index 23b04eb..7175e23 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.bufferrest/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.bufferrest/strings.en
@@ -1,8 +1,8 @@
org.apache.streampipes.sinks.brokers.jvm.bufferrest.title=Buffered REST Publisher
-org.apache.streampipes.sinks.brokers.jvm.bufferrest.description=Collects a given amount of events into a JSON array. Once this event count is reached the JSON array is posted to the given REST interface.
+org.apache.streampipes.sinks.brokers.jvm.bufferrest.description=Once a given amount of events is reached, events are posted to the given REST interface.
bufferrest.uri.title=REST URL
-bufferrest.uri.description=URL of the REST endoint
+bufferrest.uri.description=URL of the REST endpoint
bufferrest.count.title=Buffer Size
bufferrest.count=The amount of events to buffer before sending them on
diff --git a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.ditto/documentation.md b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.ditto/documentation.md
index cc60f23..2737fd8 100644
--- a/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.ditto/documentation.md
+++ b/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.ditto/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## CouchDB
+## Eclipse Ditto
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -38,19 +38,7 @@ This sink does not have any requirements and works with any incoming event type.
## Configuration
-Describe the configuration parameters here
-
-### Hostname
-
-The hostname of the CouchDB instance.
-
-### Port
-
-The port of the CouchDB instance.
-
-### Database Name
-
-The name of the database where events will be stored
+(tbd)
## Output