You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/03 20:44:09 UTC

[incubator-streampipes-extensions] 02/02: added geofence sink

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 3c62db155754745dab67069c90c307e37fcacdb2
Author: micklich <fl...@disy.net>
AuthorDate: Wed Jun 3 22:43:27 2020 +0200

    added geofence sink
---
 .../geo/jvm/jts/geofence/SpGeofencelDatabase.java  | 158 +++++++++++++--------
 .../jvm/jts/geofence/storing/StoreGeofence.java    |  58 +++++++-
 .../geofence/storing/StoreGeofenceController.java  |  18 +++
 .../geofence/storing/StoreGeofenceParameters.java  |  18 +++
 .../documentation.md                               |  30 +---
 .../icon.png                                       | Bin 9399 -> 13895 bytes
 .../strings.en                                     |  25 ++--
 7 files changed, 205 insertions(+), 102 deletions(-)

diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
index ca76166..8f29389 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
@@ -1,31 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.schema.EventProperty;
 import org.locationtech.jts.geom.Geometry;
 
 import java.sql.*;
 import java.time.LocalDateTime;
+import java.util.List;
 
 public class SpGeofencelDatabase extends PostgresJdbcClient {
 
-  private String host;
-  private Integer port;
-  private String dbname;
-  protected String user;
-  protected String password;
-  private String schema;
-  private String driver;
-  private String urlname;
-  private String url;
-
-  protected String allowedRegEx;
-  protected Logger logger;
+
   protected String geofenceName;
+  protected String url;
+  protected String databaseName;
+  protected String tableName;
 
 
   // DB Tables
-  protected final String geofenceMainTable = "geofences";
   private final String PG_COLUMN_ID = "id";
   private final String PG_COLUMN_CREATED = "created_at";
   private final String PG_COLUMN_UPDATED = "updated_at";
@@ -37,22 +48,40 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     super();
   }
 
-  protected void initializeJdbc(String allowedRegEx,
-                                String geofenceName,
-                                Logger logger) throws SpRuntimeException {
-    this.allowedRegEx = allowedRegEx;
-    this.logger = logger;
+  protected void initializeJdbc(List<EventProperty> eventProperties,
+                                String host,
+                                Integer port,
+                                String databaseName,
+                                String tableName,
+                                String user,
+                                String password,
+                                String allowedRegEx,
+                                String driver,
+                                String urlName,
+                                Logger logger,
+                                String schemaName,
+                                boolean isToDropTable,
+                                String geofenceName) throws SpRuntimeException {
+    super.initializeJdbc(
+        eventProperties,
+        host,
+        port,
+        databaseName,
+        tableName,
+        user,
+        password,
+        allowedRegEx,
+        driver,
+        urlName,
+        logger,
+        schemaName,
+        isToDropTable);
+
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
     this.geofenceName = geofenceName;
-
-    this.host = "localhost";
-    this.port = 54321;
-    this.dbname = "geo_streampipes";
-    this.user = "geo_streampipes";
-    this.password = "bQgu\"FUR_VH6z>~j";
-    this.schema = "geofence";
-    this.driver = "org.postgresql.Driver";
-    this.urlname = "postgresql";
-    this.url = "jdbc:" + urlname + "://" + host + ":" + port + "/";
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
 
     try {
       Class.forName(driver);
@@ -66,7 +95,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
   protected void connect() throws SpRuntimeException {
     try {
       c = DriverManager.getConnection(url, user, password);
-      ensureSchemaExists(url, dbname);
+      ensureSchemaExists(this.url, databaseName);
       createGeofenceTable();
       createGeofenceTables();
 
@@ -75,29 +104,41 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     }
   }
 
+
+  private String createGeofenceTableQuery(){
+
+    StringBuilder statement = new StringBuilder("CREATE TABLE ");
+    statement.append(schemaName);
+    statement.append(".");
+    statement.append(tableName).append(" ( ");
+    statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY, ");
+    statement.append(PG_COLUMN_CREATED + " TIMESTAMP, ");
+    statement.append(PG_COLUMN_UPDATED + " TIMESTAMP, ");
+    statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE, ");
+    statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY");
+    statement.append(" );");
+
+    return statement.toString();
+  }
   private boolean createGeofenceTable() throws SpRuntimeException {
     boolean returnValue = false;
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
-      ResultSet rs = c.getMetaData().getTables(null, null, geofenceMainTable, null);
-      while (rs.next()) {
-        // same table names can exists in different schmemas
-        if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
-
-          StringBuilder statement = new StringBuilder("CREATE TABLE ");
-          statement.append(schemaName);
-          statement.append(".");
-          statement.append(geofenceMainTable).append(" ( ");
-          statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY,");
-          statement.append(PG_COLUMN_CREATED + " TIMESTAMP,");
-          statement.append(PG_COLUMN_UPDATED + " TIMESTAMP,");
-          statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE");
-          statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY,");
-          statement.append(" );");
-
+      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+      //if table does not exist create it
+      if (!rs.next()) {
+        try {
+          st.executeUpdate(createGeofenceTableQuery());
+          returnValue = true;
+        } catch (SQLException e) {
+          throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+        }
+      } else {
+        // ok so table exist but in the right schema? and if not, create table
+        if (!(rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase()))) {
           try {
-            st.executeUpdate(statement.toString());
+            st.executeUpdate(createGeofenceTableQuery());
             returnValue = true;
           } catch (SQLException e) {
             throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
@@ -113,19 +154,17 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     return returnValue;
   }
 
-
   public void createGeofenceTables() throws SpRuntimeException {
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
       LocalDateTime settime = LocalDateTime.now();
 
-
       StringBuilder statement = new StringBuilder("INSERT INTO ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" (" + PG_COLUMN_CREATED + ", " + PG_COLUMN_GEOFENCENAME + ")");
       statement.append(" VALUES ('" + settime + "' , '" + geofenceName + "');");
 
@@ -134,7 +173,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     } catch (SQLException e1) {
       // Unique-Constraint validation try with recursive solution
       closeAll();
-      throw new SpRuntimeException("Geofence Name is already taken. Please try another name");
+      throw new SpRuntimeException("Geofence Name is already taken. Please try another name" +e1.getMessage());
     }
   }
 
@@ -145,25 +184,26 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     LocalDateTime settime = LocalDateTime.now();
 
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
       StringBuilder statement = new StringBuilder("UPDATE ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" SET ");
-      statement.append(PG_COLUMN_UPDATED).append(" = ").append(settime).append(",");
+      statement.append(PG_COLUMN_UPDATED).append(" = '").append(settime).append("',");
       statement.append(PG_COLUMN_GEOMETRY).append(" = ");
       statement.append("ST_GeomFromText('").append(geom.toText()).append("' ,").append(geom.getSRID()).append(")");
-      statement.append("WHERE name = ").append(geofenceName);
+      statement.append(" WHERE ");
+      statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("'");
       statement.append(";");
 
       st.executeUpdate(statement.toString());
       result = true;
 
     } catch (SQLException e) {
-      throw new SpRuntimeException(e.getMessage());
+      throw new SpRuntimeException("Something went wrong during table creation with error message :" + e.getMessage());
     }
 
     return result;
@@ -174,14 +214,14 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     boolean result;
 
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
 
       StringBuilder statement = new StringBuilder("DELETE FROM ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" WHERE ");
       statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("';");
 
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
index c345a0c..6882bca 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -10,6 +28,7 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
 import org.apache.streampipes.processors.geo.jvm.jts.geofence.SpGeofencelDatabase;
 
 
+
 public class StoreGeofence extends SpGeofencelDatabase implements EventSink<StoreGeofenceParameters> {
 
   private static Logger LOG;
@@ -19,6 +38,18 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
   private String geom_wkt;
   private String epsg_code;
 
+  private String host;
+  private Integer port;
+  private String user;
+  private String password;
+  private String schema;
+  private String driver;
+  private String urlName;
+  private String allowedRegEx;
+  private  String geofenceTable;
+
+
+
 
   @Override
   public void onInvocation(StoreGeofenceParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
@@ -28,7 +59,32 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
     this.epsg_code = parameters.getEpsg();
     this.geofenceName = parameters.getGeofenceName();
 
-    initializeJdbc("^[a-zA-Z_][a-zA-Z0-9_]*$", geofenceName, LOG);
+
+    this.host = "localhost";
+    this.port = 54321;
+    this.databaseName = "geo_streampipes";
+    this.user = "geo_streampipes";
+    this.password = "bQgu\"FUR_VH6z>~j";
+    this.schema = "geofence";
+    this.driver = "org.postgresql.Driver";
+    this.urlName = "postgresql";
+    this.geofenceTable = "geofences";
+
+    initializeJdbc(
+        parameters.getGraph().getInputStreams().get(0).getEventSchema().getEventProperties(),
+        host,
+        port,
+        databaseName,
+        geofenceTable,
+        user,
+        password,
+        "^[a-zA-Z_][a-zA-Z0-9_]*$",
+        driver,
+        urlName,
+        LOG,
+        schema,
+        false,
+        geofenceName);
   }
 
 
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
index 1df30c8..15b4a8f 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.model.DataSinkType;
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
index 332a8c6..e1297a3 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.model.graph.DataSinkInvocation;
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
index 1fbec38..d5b5b95 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Trajectory from JTS Point
+## Geofence Sink
 
 <p align="center">
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -33,45 +33,25 @@ This processor creates a JTS LineString geometry from  JTS Points events, repres
 
 *  WKT String of a JTS Point Geometry
 *  Integer value representing EPSG code
-*  Number value for M-value
+*  String Geofence name
 
 
 ***
 
 ## Configuration
 
-Creates a JTS Geometry LineString from a JTS Point Geometries events representing a trajectory.
+STores a geometry in the internal PostGIS database. Geofence can be extraxted with the Geofence Enricher and chose geofence table.
 
 
 ### 1st parameter
-Point WKT String
+Geometry WKT String
 
 ### 2nd parameter
 EPSG code value
 
 ### 3rd parameter
-M-value for each sub-point of the trajectory
-
-### 4rd parameter
-String for a description text for the trajectory
-
-### 5rd parameter
-Number of allowed sub-points
+name of the geofence. Has to be unique compared with other geofences.
 
 ***
 
-## Output
-
-Adds a LineString geometry in the Well Known Text to the event, representing a trajectory. Also the description text is added to the event stream. The first existing event creates an empty LineString.
-
 ### Example
-Creating a LineString with a threshold of 2 allowed sub-points:
-
-* First Event:
-  * Point(8.12 41.23) --> LineString <empty>
-* Second Event:
-  * Point(8.56 41.25) --> LineString(8.12 41.23, 8.56 41.25)
-* Second Event:
-  * Point(8.84 40.98) --> LineString(8.56 41.25, 8.84 40.98)
-
-M-value is not represented in the LineString but will be stored for internal use!
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png
index 7389006..810f80c 100644
Binary files a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png and b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png differ
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
index 6dba4e1..b878b35 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
@@ -1,20 +1,11 @@
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Single Trajectory Creator
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Creates a trajectory from JTS point events
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Geofence Sink
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Stores a single geometry in a geofence PostGIS table
 
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+geometry-key.title=JTS Geometry Event
+geometry-key.description=Single Geometry which will be stored in the sink
 
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+epsg-key.title=EPSG-Code
+epsg-key.description=EPSG-Code of the geometry
 
-epsg-key.title=CRS of Input Point
-epsg-key.description=EPSG-Code of input point
-
-m-key.title=measurement value
-m-key.description=Measurement value which will be stored with each point event
-
-description-key.title=description text of trajectory
-description-key.description=A description text for the trajectory
-
-subpoints-key.title=number of allowed sub-points
-subpoints-key.description=amount of allowed sub-points, creating the trajectory
+geofence-key.title=Geofence Name
+geofence-key.description=Unique name of the geofence
\ No newline at end of file