You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/03 20:44:09 UTC
[incubator-streampipes-extensions] 02/02: added geofence sink
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 3c62db155754745dab67069c90c307e37fcacdb2
Author: micklich <fl...@disy.net>
AuthorDate: Wed Jun 3 22:43:27 2020 +0200
added geofence sink
---
.../geo/jvm/jts/geofence/SpGeofencelDatabase.java | 158 +++++++++++++--------
.../jvm/jts/geofence/storing/StoreGeofence.java | 58 +++++++-
.../geofence/storing/StoreGeofenceController.java | 18 +++
.../geofence/storing/StoreGeofenceParameters.java | 18 +++
.../documentation.md | 30 +---
.../icon.png | Bin 9399 -> 13895 bytes
.../strings.en | 25 ++--
7 files changed, 205 insertions(+), 102 deletions(-)
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
index ca76166..8f29389 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
@@ -1,31 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.streampipes.processors.geo.jvm.jts.geofence;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.schema.EventProperty;
import org.locationtech.jts.geom.Geometry;
import java.sql.*;
import java.time.LocalDateTime;
+import java.util.List;
public class SpGeofencelDatabase extends PostgresJdbcClient {
- private String host;
- private Integer port;
- private String dbname;
- protected String user;
- protected String password;
- private String schema;
- private String driver;
- private String urlname;
- private String url;
-
- protected String allowedRegEx;
- protected Logger logger;
+
protected String geofenceName;
+ protected String url;
+ protected String databaseName;
+ protected String tableName;
// DB Tables
- protected final String geofenceMainTable = "geofences";
private final String PG_COLUMN_ID = "id";
private final String PG_COLUMN_CREATED = "created_at";
private final String PG_COLUMN_UPDATED = "updated_at";
@@ -37,22 +48,40 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
super();
}
- protected void initializeJdbc(String allowedRegEx,
- String geofenceName,
- Logger logger) throws SpRuntimeException {
- this.allowedRegEx = allowedRegEx;
- this.logger = logger;
+ protected void initializeJdbc(List<EventProperty> eventProperties,
+ String host,
+ Integer port,
+ String databaseName,
+ String tableName,
+ String user,
+ String password,
+ String allowedRegEx,
+ String driver,
+ String urlName,
+ Logger logger,
+ String schemaName,
+ boolean isToDropTable,
+ String geofenceName) throws SpRuntimeException {
+ super.initializeJdbc(
+ eventProperties,
+ host,
+ port,
+ databaseName,
+ tableName,
+ user,
+ password,
+ allowedRegEx,
+ driver,
+ urlName,
+ logger,
+ schemaName,
+ isToDropTable);
+
+ this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
this.geofenceName = geofenceName;
-
- this.host = "localhost";
- this.port = 54321;
- this.dbname = "geo_streampipes";
- this.user = "geo_streampipes";
- this.password = "bQgu\"FUR_VH6z>~j";
- this.schema = "geofence";
- this.driver = "org.postgresql.Driver";
- this.urlname = "postgresql";
- this.url = "jdbc:" + urlname + "://" + host + ":" + port + "/";
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
try {
Class.forName(driver);
@@ -66,7 +95,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
protected void connect() throws SpRuntimeException {
try {
c = DriverManager.getConnection(url, user, password);
- ensureSchemaExists(url, dbname);
+ ensureSchemaExists(this.url, databaseName);
createGeofenceTable();
createGeofenceTables();
@@ -75,29 +104,41 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
}
}
+
+ private String createGeofenceTableQuery(){
+
+ StringBuilder statement = new StringBuilder("CREATE TABLE ");
+ statement.append(schemaName);
+ statement.append(".");
+ statement.append(tableName).append(" ( ");
+ statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY, ");
+ statement.append(PG_COLUMN_CREATED + " TIMESTAMP, ");
+ statement.append(PG_COLUMN_UPDATED + " TIMESTAMP, ");
+ statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE, ");
+ statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY");
+ statement.append(" );");
+
+ return statement.toString();
+ }
private boolean createGeofenceTable() throws SpRuntimeException {
boolean returnValue = false;
try {
- c = DriverManager.getConnection(url + dbname, user, password);
+ c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
- ResultSet rs = c.getMetaData().getTables(null, null, geofenceMainTable, null);
- while (rs.next()) {
- // same table names can exists in different schmemas
- if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
-
- StringBuilder statement = new StringBuilder("CREATE TABLE ");
- statement.append(schemaName);
- statement.append(".");
- statement.append(geofenceMainTable).append(" ( ");
- statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY,");
- statement.append(PG_COLUMN_CREATED + " TIMESTAMP,");
- statement.append(PG_COLUMN_UPDATED + " TIMESTAMP,");
- statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE");
- statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY,");
- statement.append(" );");
-
+ ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+ //if table does not exist create it
+ if (!rs.next()) {
+ try {
+ st.executeUpdate(createGeofenceTableQuery());
+ returnValue = true;
+ } catch (SQLException e) {
+ throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+ }
+ } else {
+ // ok so table exist but in the right schema? and if not, create table
+ if (!(rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase()))) {
try {
- st.executeUpdate(statement.toString());
+ st.executeUpdate(createGeofenceTableQuery());
returnValue = true;
} catch (SQLException e) {
throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
@@ -113,19 +154,17 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
return returnValue;
}
-
public void createGeofenceTables() throws SpRuntimeException {
try {
- c = DriverManager.getConnection(url + dbname, user, password);
+ c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
LocalDateTime settime = LocalDateTime.now();
-
StringBuilder statement = new StringBuilder("INSERT INTO ");
statement.append(schemaName);
statement.append(".");
- statement.append(geofenceMainTable);
+ statement.append(tableName);
statement.append(" (" + PG_COLUMN_CREATED + ", " + PG_COLUMN_GEOFENCENAME + ")");
statement.append(" VALUES ('" + settime + "' , '" + geofenceName + "');");
@@ -134,7 +173,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
} catch (SQLException e1) {
// Unique-Constraint validation try with recursive solution
closeAll();
- throw new SpRuntimeException("Geofence Name is already taken. Please try another name");
+ throw new SpRuntimeException("Geofence Name is already taken. Please try another name" +e1.getMessage());
}
}
@@ -145,25 +184,26 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
LocalDateTime settime = LocalDateTime.now();
try {
- c = DriverManager.getConnection(url + dbname, user, password);
+ c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
StringBuilder statement = new StringBuilder("UPDATE ");
statement.append(schemaName);
statement.append(".");
- statement.append(geofenceMainTable);
+ statement.append(tableName);
statement.append(" SET ");
- statement.append(PG_COLUMN_UPDATED).append(" = ").append(settime).append(",");
+ statement.append(PG_COLUMN_UPDATED).append(" = '").append(settime).append("',");
statement.append(PG_COLUMN_GEOMETRY).append(" = ");
statement.append("ST_GeomFromText('").append(geom.toText()).append("' ,").append(geom.getSRID()).append(")");
- statement.append("WHERE name = ").append(geofenceName);
+ statement.append(" WHERE ");
+ statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("'");
statement.append(";");
st.executeUpdate(statement.toString());
result = true;
} catch (SQLException e) {
- throw new SpRuntimeException(e.getMessage());
+ throw new SpRuntimeException("Something went wrong during table creation with error message :" + e.getMessage());
}
return result;
@@ -174,14 +214,14 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
boolean result;
try {
- c = DriverManager.getConnection(url + dbname, user, password);
+ c = DriverManager.getConnection(url + databaseName, user, password);
st = c.createStatement();
StringBuilder statement = new StringBuilder("DELETE FROM ");
statement.append(schemaName);
statement.append(".");
- statement.append(geofenceMainTable);
+ statement.append(tableName);
statement.append(" WHERE ");
statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("';");
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
index c345a0c..6882bca 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -10,6 +28,7 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
import org.apache.streampipes.processors.geo.jvm.jts.geofence.SpGeofencelDatabase;
+
public class StoreGeofence extends SpGeofencelDatabase implements EventSink<StoreGeofenceParameters> {
private static Logger LOG;
@@ -19,6 +38,18 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
private String geom_wkt;
private String epsg_code;
+ private String host;
+ private Integer port;
+ private String user;
+ private String password;
+ private String schema;
+ private String driver;
+ private String urlName;
+ private String allowedRegEx;
+ private String geofenceTable;
+
+
+
@Override
public void onInvocation(StoreGeofenceParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
@@ -28,7 +59,32 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
this.epsg_code = parameters.getEpsg();
this.geofenceName = parameters.getGeofenceName();
- initializeJdbc("^[a-zA-Z_][a-zA-Z0-9_]*$", geofenceName, LOG);
+
+ this.host = "localhost";
+ this.port = 54321;
+ this.databaseName = "geo_streampipes";
+ this.user = "geo_streampipes";
+ this.password = "bQgu\"FUR_VH6z>~j";
+ this.schema = "geofence";
+ this.driver = "org.postgresql.Driver";
+ this.urlName = "postgresql";
+ this.geofenceTable = "geofences";
+
+ initializeJdbc(
+ parameters.getGraph().getInputStreams().get(0).getEventSchema().getEventProperties(),
+ host,
+ port,
+ databaseName,
+ geofenceTable,
+ user,
+ password,
+ "^[a-zA-Z_][a-zA-Z0-9_]*$",
+ driver,
+ urlName,
+ LOG,
+ schema,
+ false,
+ geofenceName);
}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
index 1df30c8..15b4a8f 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
import org.apache.streampipes.model.DataSinkType;
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
index 332a8c6..e1297a3 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
import org.apache.streampipes.model.graph.DataSinkInvocation;
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
index 1fbec38..d5b5b95 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## Trajectory from JTS Point
+## Geofence Sink
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -33,45 +33,25 @@ This processor creates a JTS LineString geometry from JTS Points events, repres
* WKT String of a JTS Point Geometry
* Integer value representing EPSG code
-* Number value for M-value
+* String Geofence name
***
## Configuration
-Creates a JTS Geometry LineString from a JTS Point Geometries events representing a trajectory.
+STores a geometry in the internal PostGIS database. Geofence can be extraxted with the Geofence Enricher and chose geofence table.
### 1st parameter
-Point WKT String
+Geometry WKT String
### 2nd parameter
EPSG code value
### 3rd parameter
-M-value for each sub-point of the trajectory
-
-### 4rd parameter
-String for a description text for the trajectory
-
-### 5rd parameter
-Number of allowed sub-points
+name of the geofence. Has to be unique compared with other geofences.
***
-## Output
-
-Adds a LineString geometry in the Well Known Text to the event, representing a trajectory. Also the description text is added to the event stream. The first existing event creates an empty LineString.
-
### Example
-Creating a LineString with a threshold of 2 allowed sub-points:
-
-* First Event:
- * Point(8.12 41.23) --> LineString <empty>
-* Second Event:
- * Point(8.56 41.25) --> LineString(8.12 41.23, 8.56 41.25)
-* Second Event:
- * Point(8.84 40.98) --> LineString(8.56 41.25, 8.84 40.98)
-
-M-value is not represented in the LineString but will be stored for internal use!
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png
index 7389006..810f80c 100644
Binary files a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png and b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png differ
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
index 6dba4e1..b878b35 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
@@ -1,20 +1,11 @@
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Single Trajectory Creator
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Creates a trajectory from JTS point events
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Geofence Sink
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Stores a single geometry in a geofence PostGIS table
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+geometry-key.title=JTS Geometry Event
+geometry-key.description=Single Geometry which will be stored in the sink
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+epsg-key.title=EPSG-Code
+epsg-key.description=EPSG-Code of the geometry
-epsg-key.title=CRS of Input Point
-epsg-key.description=EPSG-Code of input point
-
-m-key.title=measurement value
-m-key.description=Measurement value which will be stored with each point event
-
-description-key.title=description text of trajectory
-description-key.description=A description text for the trajectory
-
-subpoints-key.title=number of allowed sub-points
-subpoints-key.description=amount of allowed sub-points, creating the trajectory
+geofence-key.title=Geofence Name
+geofence-key.description=Unique name of the geofence
\ No newline at end of file