You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/03 20:44:07 UTC

[incubator-streampipes-extensions] branch feature/postgis-sink updated (4c629c1 -> 3c62db1)

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a change to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


    from 4c629c1  added PostgresJBDC-Client and rechanges IoT-DB
     new bf1c03f  geofence part1
     new 3c62db1  added geofence sink

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |   4 +-
 .../geo/jvm/jts/geofence/PostgresJdbcClient.java   | 682 +++++++++++++++++++++
 .../geo/jvm/jts/geofence/SpGeofencelDatabase.java  | 237 +++++++
 .../jvm/jts/geofence/storing/StoreGeofence.java    | 107 ++++
 .../geofence/storing/StoreGeofenceController.java  |  77 +++
 .../geofence/storing/StoreGeofenceParameters.java  |  32 +-
 .../documentation.md                               |  30 +-
 .../icon.png                                       | Bin 0 -> 13895 bytes
 .../strings.en                                     |  11 +
 .../jvm/postgresql/PostgresJdbcClient.java         |   2 +-
 10 files changed, 1144 insertions(+), 38 deletions(-)
 create mode 100644 streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/PostgresJdbcClient.java
 create mode 100644 streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
 create mode 100755 streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
 create mode 100755 streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
 copy streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchParameters.java => streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java (63%)
 mode change 100644 => 100755
 copy streampipes-processors-geo-jvm/src/main/resources/{org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory => org.apache.streampipes.processors.geo.jvm.jts.geofence.storing}/documentation.md (67%)
 create mode 100644 streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png
 create mode 100644 streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en


[incubator-streampipes-extensions] 02/02: added geofence sink

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 3c62db155754745dab67069c90c307e37fcacdb2
Author: micklich <fl...@disy.net>
AuthorDate: Wed Jun 3 22:43:27 2020 +0200

    added geofence sink
---
 .../geo/jvm/jts/geofence/SpGeofencelDatabase.java  | 158 +++++++++++++--------
 .../jvm/jts/geofence/storing/StoreGeofence.java    |  58 +++++++-
 .../geofence/storing/StoreGeofenceController.java  |  18 +++
 .../geofence/storing/StoreGeofenceParameters.java  |  18 +++
 .../documentation.md                               |  30 +---
 .../icon.png                                       | Bin 9399 -> 13895 bytes
 .../strings.en                                     |  25 ++--
 7 files changed, 205 insertions(+), 102 deletions(-)

diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
index ca76166..8f29389 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
@@ -1,31 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.schema.EventProperty;
 import org.locationtech.jts.geom.Geometry;
 
 import java.sql.*;
 import java.time.LocalDateTime;
+import java.util.List;
 
 public class SpGeofencelDatabase extends PostgresJdbcClient {
 
-  private String host;
-  private Integer port;
-  private String dbname;
-  protected String user;
-  protected String password;
-  private String schema;
-  private String driver;
-  private String urlname;
-  private String url;
-
-  protected String allowedRegEx;
-  protected Logger logger;
+
   protected String geofenceName;
+  protected String url;
+  protected String databaseName;
+  protected String tableName;
 
 
   // DB Tables
-  protected final String geofenceMainTable = "geofences";
   private final String PG_COLUMN_ID = "id";
   private final String PG_COLUMN_CREATED = "created_at";
   private final String PG_COLUMN_UPDATED = "updated_at";
@@ -37,22 +48,40 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     super();
   }
 
-  protected void initializeJdbc(String allowedRegEx,
-                                String geofenceName,
-                                Logger logger) throws SpRuntimeException {
-    this.allowedRegEx = allowedRegEx;
-    this.logger = logger;
+  protected void initializeJdbc(List<EventProperty> eventProperties,
+                                String host,
+                                Integer port,
+                                String databaseName,
+                                String tableName,
+                                String user,
+                                String password,
+                                String allowedRegEx,
+                                String driver,
+                                String urlName,
+                                Logger logger,
+                                String schemaName,
+                                boolean isToDropTable,
+                                String geofenceName) throws SpRuntimeException {
+    super.initializeJdbc(
+        eventProperties,
+        host,
+        port,
+        databaseName,
+        tableName,
+        user,
+        password,
+        allowedRegEx,
+        driver,
+        urlName,
+        logger,
+        schemaName,
+        isToDropTable);
+
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
     this.geofenceName = geofenceName;
-
-    this.host = "localhost";
-    this.port = 54321;
-    this.dbname = "geo_streampipes";
-    this.user = "geo_streampipes";
-    this.password = "bQgu\"FUR_VH6z>~j";
-    this.schema = "geofence";
-    this.driver = "org.postgresql.Driver";
-    this.urlname = "postgresql";
-    this.url = "jdbc:" + urlname + "://" + host + ":" + port + "/";
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
 
     try {
       Class.forName(driver);
@@ -66,7 +95,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
   protected void connect() throws SpRuntimeException {
     try {
       c = DriverManager.getConnection(url, user, password);
-      ensureSchemaExists(url, dbname);
+      ensureSchemaExists(this.url, databaseName);
       createGeofenceTable();
       createGeofenceTables();
 
@@ -75,29 +104,41 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     }
   }
 
+
+  private String createGeofenceTableQuery(){
+
+    StringBuilder statement = new StringBuilder("CREATE TABLE ");
+    statement.append(schemaName);
+    statement.append(".");
+    statement.append(tableName).append(" ( ");
+    statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY, ");
+    statement.append(PG_COLUMN_CREATED + " TIMESTAMP, ");
+    statement.append(PG_COLUMN_UPDATED + " TIMESTAMP, ");
+    statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE, ");
+    statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY");
+    statement.append(" );");
+
+    return statement.toString();
+  }
   private boolean createGeofenceTable() throws SpRuntimeException {
     boolean returnValue = false;
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
-      ResultSet rs = c.getMetaData().getTables(null, null, geofenceMainTable, null);
-      while (rs.next()) {
-        // same table names can exists in different schmemas
-        if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
-
-          StringBuilder statement = new StringBuilder("CREATE TABLE ");
-          statement.append(schemaName);
-          statement.append(".");
-          statement.append(geofenceMainTable).append(" ( ");
-          statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY,");
-          statement.append(PG_COLUMN_CREATED + " TIMESTAMP,");
-          statement.append(PG_COLUMN_UPDATED + " TIMESTAMP,");
-          statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE");
-          statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY,");
-          statement.append(" );");
-
+      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+      //if table does not exist create it
+      if (!rs.next()) {
+        try {
+          st.executeUpdate(createGeofenceTableQuery());
+          returnValue = true;
+        } catch (SQLException e) {
+          throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+        }
+      } else {
+        // ok so table exist but in the right schema? and if not, create table
+        if (!(rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase()))) {
           try {
-            st.executeUpdate(statement.toString());
+            st.executeUpdate(createGeofenceTableQuery());
             returnValue = true;
           } catch (SQLException e) {
             throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
@@ -113,19 +154,17 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     return returnValue;
   }
 
-
   public void createGeofenceTables() throws SpRuntimeException {
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
       LocalDateTime settime = LocalDateTime.now();
 
-
       StringBuilder statement = new StringBuilder("INSERT INTO ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" (" + PG_COLUMN_CREATED + ", " + PG_COLUMN_GEOFENCENAME + ")");
       statement.append(" VALUES ('" + settime + "' , '" + geofenceName + "');");
 
@@ -134,7 +173,7 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     } catch (SQLException e1) {
       // Unique-Constraint validation try with recursive solution
       closeAll();
-      throw new SpRuntimeException("Geofence Name is already taken. Please try another name");
+      throw new SpRuntimeException("Geofence Name is already taken. Please try another name" +e1.getMessage());
     }
   }
 
@@ -145,25 +184,26 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     LocalDateTime settime = LocalDateTime.now();
 
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
       StringBuilder statement = new StringBuilder("UPDATE ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" SET ");
-      statement.append(PG_COLUMN_UPDATED).append(" = ").append(settime).append(",");
+      statement.append(PG_COLUMN_UPDATED).append(" = '").append(settime).append("',");
       statement.append(PG_COLUMN_GEOMETRY).append(" = ");
       statement.append("ST_GeomFromText('").append(geom.toText()).append("' ,").append(geom.getSRID()).append(")");
-      statement.append("WHERE name = ").append(geofenceName);
+      statement.append(" WHERE ");
+      statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("'");
       statement.append(";");
 
       st.executeUpdate(statement.toString());
       result = true;
 
     } catch (SQLException e) {
-      throw new SpRuntimeException(e.getMessage());
+      throw new SpRuntimeException("Something went wrong during table creation with error message :" + e.getMessage());
     }
 
     return result;
@@ -174,14 +214,14 @@ public class SpGeofencelDatabase extends PostgresJdbcClient {
     boolean result;
 
     try {
-      c = DriverManager.getConnection(url + dbname, user, password);
+      c = DriverManager.getConnection(url + databaseName, user, password);
       st = c.createStatement();
 
 
       StringBuilder statement = new StringBuilder("DELETE FROM ");
       statement.append(schemaName);
       statement.append(".");
-      statement.append(geofenceMainTable);
+      statement.append(tableName);
       statement.append(" WHERE ");
       statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("';");
 
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
index c345a0c..6882bca 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -10,6 +28,7 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
 import org.apache.streampipes.processors.geo.jvm.jts.geofence.SpGeofencelDatabase;
 
 
+
 public class StoreGeofence extends SpGeofencelDatabase implements EventSink<StoreGeofenceParameters> {
 
   private static Logger LOG;
@@ -19,6 +38,18 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
   private String geom_wkt;
   private String epsg_code;
 
+  private String host;
+  private Integer port;
+  private String user;
+  private String password;
+  private String schema;
+  private String driver;
+  private String urlName;
+  private String allowedRegEx;
+  private  String geofenceTable;
+
+
+
 
   @Override
   public void onInvocation(StoreGeofenceParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
@@ -28,7 +59,32 @@ public class StoreGeofence extends SpGeofencelDatabase implements EventSink<Stor
     this.epsg_code = parameters.getEpsg();
     this.geofenceName = parameters.getGeofenceName();
 
-    initializeJdbc("^[a-zA-Z_][a-zA-Z0-9_]*$", geofenceName, LOG);
+
+    this.host = "localhost";
+    this.port = 54321;
+    this.databaseName = "geo_streampipes";
+    this.user = "geo_streampipes";
+    this.password = "bQgu\"FUR_VH6z>~j";
+    this.schema = "geofence";
+    this.driver = "org.postgresql.Driver";
+    this.urlName = "postgresql";
+    this.geofenceTable = "geofences";
+
+    initializeJdbc(
+        parameters.getGraph().getInputStreams().get(0).getEventSchema().getEventProperties(),
+        host,
+        port,
+        databaseName,
+        geofenceTable,
+        user,
+        password,
+        "^[a-zA-Z_][a-zA-Z0-9_]*$",
+        driver,
+        urlName,
+        LOG,
+        schema,
+        false,
+        geofenceName);
   }
 
 
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
index 1df30c8..15b4a8f 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.model.DataSinkType;
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
index 332a8c6..e1297a3 100755
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
 
 import org.apache.streampipes.model.graph.DataSinkInvocation;
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
index 1fbec38..d5b5b95 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Trajectory from JTS Point
+## Geofence Sink
 
 <p align="center">
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -33,45 +33,25 @@ This processor creates a JTS LineString geometry from  JTS Points events, repres
 
 *  WKT String of a JTS Point Geometry
 *  Integer value representing EPSG code
-*  Number value for M-value
+*  String Geofence name
 
 
 ***
 
 ## Configuration
 
-Creates a JTS Geometry LineString from a JTS Point Geometries events representing a trajectory.
+STores a geometry in the internal PostGIS database. Geofence can be extraxted with the Geofence Enricher and chose geofence table.
 
 
 ### 1st parameter
-Point WKT String
+Geometry WKT String
 
 ### 2nd parameter
 EPSG code value
 
 ### 3rd parameter
-M-value for each sub-point of the trajectory
-
-### 4rd parameter
-String for a description text for the trajectory
-
-### 5rd parameter
-Number of allowed sub-points
+name of the geofence. Has to be unique compared with other geofences.
 
 ***
 
-## Output
-
-Adds a LineString geometry in the Well Known Text to the event, representing a trajectory. Also the description text is added to the event stream. The first existing event creates an empty LineString.
-
 ### Example
-Creating a LineString with a threshold of 2 allowed sub-points:
-
-* First Event:
-  * Point(8.12 41.23) --> LineString <empty>
-* Second Event:
-  * Point(8.56 41.25) --> LineString(8.12 41.23, 8.56 41.25)
-* Second Event:
-  * Point(8.84 40.98) --> LineString(8.56 41.25, 8.84 40.98)
-
-M-value is not represented in the LineString but will be stored for internal use!
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png
index 7389006..810f80c 100644
Binary files a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png and b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png differ
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
index 6dba4e1..b878b35 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
@@ -1,20 +1,11 @@
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Single Trajectory Creator
-org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Creates a trajectory from JTS point events
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Geofence Sink
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Stores a single geometry in a geofence PostGIS table
 
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+geometry-key.title=JTS Geometry Event
+geometry-key.description=Single Geometry which will be stored in the sink
 
-point-key.title=JTS Point Event
-point-key.description=Single Point Event which will be added to the trajectory
+epsg-key.title=EPSG-Code
+epsg-key.description=EPSG-Code of the geometry
 
-epsg-key.title=CRS of Input Point
-epsg-key.description=EPSG-Code of input point
-
-m-key.title=measurement value
-m-key.description=Measurement value which will be stored with each point event
-
-description-key.title=description text of trajectory
-description-key.description=A description text for the trajectory
-
-subpoints-key.title=number of allowed sub-points
-subpoints-key.description=amount of allowed sub-points, creating the trajectory
+geofence-key.title=Geofence Name
+geofence-key.description=Unique name of the geofence
\ No newline at end of file


[incubator-streampipes-extensions] 01/02: geofence part1

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/postgis-sink
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit bf1c03fb6e4173289464031fe732e5819bc3f638
Author: micklich <fl...@disy.net>
AuthorDate: Mon Jun 1 06:42:11 2020 +0200

    geofence part1
---
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |   4 +-
 .../geo/jvm/jts/geofence/PostgresJdbcClient.java   | 682 +++++++++++++++++++++
 .../geo/jvm/jts/geofence/SpGeofencelDatabase.java  | 197 ++++++
 .../jvm/jts/geofence/storing/StoreGeofence.java    |  51 ++
 .../geofence/storing/StoreGeofenceController.java  |  59 ++
 .../geofence/storing/StoreGeofenceParameters.java  |  34 +
 .../documentation.md                               |  77 +++
 .../icon.png                                       | Bin 0 -> 9399 bytes
 .../strings.en                                     |  20 +
 .../jvm/postgresql/PostgresJdbcClient.java         |   2 +-
 10 files changed, 1124 insertions(+), 2 deletions(-)

diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 41b30ce..6cd8944 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.processors.geo.jvm.config.GeoJvmConfig;
+import org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.StoreGeofenceController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
@@ -50,7 +51,8 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
             .add(new LatLngToGeoController())
             .add(new CreateTrajectoryFromPointsController())
             .add(new SpeedCalculatorController())
-            .add(new StaticDistanceCalculatorController());
+            .add(new StaticDistanceCalculatorController())
+            .add(new StoreGeofenceController());
 
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/PostgresJdbcClient.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/PostgresJdbcClient.java
new file mode 100644
index 0000000..108da08
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/PostgresJdbcClient.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.geofence;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class PostgresJdbcClient {
+  private String allowedRegEx;
+
+  protected String tableName;
+  protected String schemaName;
+  protected String user;
+  protected String password;
+
+  protected boolean tableExists = false;
+  protected boolean schemaExists = false;
+  protected boolean isToDropTable = false;
+
+  protected Logger logger;
+
+  protected Connection c = null;
+  protected Statement st = null;
+  protected PreparedStatement ps = null;
+
+  /**
+   * The list of properties extracted from the graph
+   */
+  protected List<EventProperty> eventProperties;
+  /**
+   * The parameters in the prepared statement {@code ps} together with their index and data type
+   */
+  protected HashMap<String, Parameterinfo> parameters = new HashMap<>();
+
+  /**
+   * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+   * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+   */
+  protected enum SqlAttribute {
+    INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
+    private final String sqlName;
+
+    SqlAttribute(String s) {
+      sqlName = s;
+    }
+
+    /**
+     * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
+     * interpreted as a String (VARCHAR(255))
+     *
+     * @param o The object which should be identified
+     * @return An {@link SqlAttribute} of the identified type
+     */
+    public static SqlAttribute getFromObject(final Object o) {
+      SqlAttribute r;
+      if (o instanceof Integer) {
+        r = SqlAttribute.INTEGER;
+      } else if (o instanceof Long) {
+        r = SqlAttribute.LONG;
+      } else if (o instanceof Float) {
+        r = SqlAttribute.FLOAT;
+      } else if (o instanceof Double) {
+        r = SqlAttribute.DOUBLE;
+      } else if (o instanceof Boolean) {
+        r = SqlAttribute.BOOLEAN;
+      } else {
+        r = SqlAttribute.STRING;
+      }
+      return r;
+    }
+
+    public static SqlAttribute getFromUri(final String s) {
+      SqlAttribute r;
+      if (s.equals(XSD._integer.toString())) {
+        r = SqlAttribute.INTEGER;
+      } else if (s.equals(XSD._long.toString())) {
+        r = SqlAttribute.LONG;
+      } else if (s.equals(XSD._float.toString())) {
+        r = SqlAttribute.FLOAT;
+      } else if (s.equals(XSD._double.toString())) {
+        r = SqlAttribute.DOUBLE;
+      } else if (s.equals(XSD._boolean.toString())) {
+        r = SqlAttribute.BOOLEAN;
+      } else {
+        r = SqlAttribute.STRING;
+      }
+      return r;
+    }
+
+    /**
+     * Sets the value in the prepardStatement {@code ps}
+     *
+     * @param p     The needed info about the parameter (index and type)
+     * @param value The value of the object, which should be filled in the
+     * @param ps    The prepared statement, which will be filled
+     * @throws SpRuntimeException When the data type in {@code p} is unknown
+     * @throws SQLException       When the setters of the statement throw an
+     *                            exception (e.g. {@code setInt()})
+     */
+    public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
+        throws SQLException, SpRuntimeException {
+      switch (p.type) {
+        case INTEGER:
+          ps.setInt(p.index, (Integer) value);
+          break;
+        case LONG:
+          ps.setLong(p.index, (Long) value);
+          break;
+        case FLOAT:
+          ps.setFloat(p.index, (Float) value);
+          break;
+        case DOUBLE:
+          ps.setDouble(p.index, (Double) value);
+          break;
+        case BOOLEAN:
+          ps.setBoolean(p.index, (Boolean) value);
+          break;
+        case STRING:
+          ps.setString(p.index, value.toString());
+          break;
+        default:
+          throw new SpRuntimeException("Unknown SQL datatype");
+      }
+    }
+
+    @Override
+    public String toString() {
+      return sqlName;
+    }
+  }
+
+  /**
+   * Contains all information needed to "fill" a prepared statement (index and the data type)
+   */
+  protected static class Parameterinfo {
+    private int index;
+    private SqlAttribute type;
+
+    public Parameterinfo(final int index, final SqlAttribute type) {
+      this.index = index;
+      this.type = type;
+    }
+  }
+
+
+  public PostgresJdbcClient() {
+  }
+
+  protected void initializeJdbc(List<EventProperty> eventProperties,
+                                String host,
+                                Integer port,
+                                String databaseName,
+                                String tableName,
+                                String user,
+                                String password,
+                                String allowedRegEx,
+                                String driver,
+                                String urlName,
+                                Logger logger,
+                                String schemaName,
+                                boolean isToDropTable) throws SpRuntimeException {
+    this.tableName = tableName;
+    this.user = user;
+    this.password = password;
+    this.allowedRegEx = allowedRegEx;
+    this.logger = logger;
+    this.eventProperties = eventProperties;
+    this.schemaName = schemaName;
+    this.isToDropTable = isToDropTable;
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      throw new SpRuntimeException("Driver '" + driver + "' not found.");
+    }
+
+    connect(host, port, urlName, databaseName);
+  }
+
+
+  /**
+   * Connects to the HadoopFileSystem Server and initilizes {@link PostgresJdbcClient#c} and
+   * {@link PostgresJdbcClient#st}
+   *
+   * @throws SpRuntimeException When the connection could not be established (because of a
+   *                            wrong identification, missing database etc.)
+   */
+  protected void connect(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
+    String url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+    try {
+      c = DriverManager.getConnection(url, user, password);
+      ensureDatabaseExists(url, databaseName);
+      ensureSchemaExists(url, databaseName);
+      ensureTableExists(url, databaseName);
+    } catch (SQLException e) {
+      // host or port is wrong -- Class 08  Connection Exception
+      if (e.getSQLState().substring(0, 2).equals("08")) {
+        throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+      }
+      // username or password is wrong -- Class 28  Invalid Authorization Specification
+      else if (e.getSQLState().substring(0, 2).equals("28")) {
+        throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+      } else {
+        throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * If this method returns successfully a database with the given name exists on the server, specified by the url.
+   *
+   * @param url          The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+   * @param databaseName The name of the database that should exist
+   * @throws SpRuntimeException If the database does not exists and could not be created
+   */
+  protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
+    checkRegEx(databaseName, "databasename");
+
+    try {
+      // Checks whether the database already exists (using catalogs has not worked with postgres)
+      st = c.createStatement();
+      st.executeUpdate("CREATE DATABASE " + databaseName + ";");
+      logger.info("Created new database '" + databaseName + "'");
+    } catch (SQLException e1) {
+      if (!e1.getSQLState().substring(0, 2).equals("42")) {
+        throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
+      }
+    }
+    closeAll();
+  }
+
+  /**
+   * If this method returns successfully a schema with the name in {@link PostgresJdbcClient#schemaName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @param url          The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+   * @param databaseName The database in which the table should exist
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureSchemaExists(String url, String databaseName) throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getSchemas();
+
+      boolean isItExisting = false;
+      while (rs.next()) {
+        String schema = rs.getString("TABLE_SCHEM");
+        if (schema.toLowerCase().equals(schemaName.toLowerCase())) {
+          isItExisting = true;
+        }
+      }
+
+      if (!isItExisting) {
+        createSchema();
+      }
+
+      schemaExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      //closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+  /**
+   * If this method returns successfully a table with the name in {@link PostgresJdbcClient#tableName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @param url          The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
+   * @param databaseName The database in which the table should exist
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+      while (rs.next()) {
+        // same table names can exists in different schmemas
+        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+          if (isToDropTable) {
+            createTable();
+          }
+          validateTable();
+        } else {
+          createTable();
+        }
+      }
+      tableExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+  /**
+   * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
+   * event. To fill in the values it calls {@link PostgresJdbcClient#fillPreparedStatement(Map)}.
+   *
+   * @param event Data to be saved in the SQL table
+   * @throws SQLException       When the statement cannot be executed
+   * @throws SpRuntimeException When the table name is not allowed or it is thrown
+   *                            by {@link SqlAttribute#setValue(Parameterinfo, Object, PreparedStatement)}
+   */
+  private void executePreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    checkConnected();
+    if (ps != null) {
+      ps.clearParameters();
+    }
+    fillPreparedStatement(event);
+    ps.executeUpdate();
+  }
+
+  /**
+   * Prepares a statement for the insertion of values or the
+   *
+   * @param event The event which should be saved to the Postgres table
+   * @throws SpRuntimeException When there was an error in the saving process
+   */
+  protected void save(final Event event) throws SpRuntimeException {
+    //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
+    checkConnected();
+    Map<String, Object> eventMap = event.getRaw();
+    if (event == null) {
+      throw new SpRuntimeException("event is null");
+    }
+
+    if (!schemaExists) {
+      // Creates the schema
+      createSchema();
+      schemaExists = true;
+    }
+
+    if (!tableExists) {
+      // Creates the table
+      createTable();
+      tableExists = true;
+    }
+    try {
+      executePreparedStatement(eventMap);
+    } catch (SQLException e) {
+      if (e.getSQLState().substring(0, 2).equals("42")) {
+        // If the table does not exists (because it got deleted or something, will cause the error
+        // code "42") we will try to create a new one. Otherwise we do not handle the exception.
+        logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
+        tableExists = false;
+        createTable();
+        tableExists = true;
+
+        try {
+          executePreparedStatement(eventMap);
+        } catch (SQLException e1) {
+          throw new SpRuntimeException(e1.getMessage());
+        }
+      } else {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void fillPreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    fillPreparedStatement(event, "");
+  }
+
+  /**
+   * Fills a prepared statement with the actual values base on {@link PostgresJdbcClient#parameters}. If
+   * {@link PostgresJdbcClient#parameters} is empty or not complete (which should only happen once in the
+   * begining), it calls {@link PostgresJdbcClient#generatePreparedStatement(Map)} to generate a new one.
+   *
+   * @param event
+   * @param pre
+   * @throws SQLException
+   * @throws SpRuntimeException
+   */
+  private void fillPreparedStatement(final Map<String, Object> event, String pre)
+      throws SQLException, SpRuntimeException {
+    // checkConnected();
+    //TODO: Possible error: when the event does not contain all objects of the parameter list
+    for (Map.Entry<String, Object> pair : event.entrySet()) {
+      String newKey = pre + pair.getKey();
+      if (pair.getValue() instanceof Map) {
+        // recursively extracts nested values
+        fillPreparedStatement((Map<String, Object>) pair.getValue(), newKey + "_");
+      } else {
+        if (!parameters.containsKey(newKey)) {
+          //TODO: start the for loop all over again
+          generatePreparedStatement(event);
+        }
+        Parameterinfo p = parameters.get(newKey);
+        SqlAttribute.setValue(p, pair.getValue(), ps);
+      }
+    }
+  }
+
+  /**
+   * Initializes the variables {@link PostgresJdbcClient#parameters} and {@link PostgresJdbcClient#ps}
+   * according to the parameter event.
+   *
+   * @param event The event which is getting analyzed
+   * @throws SpRuntimeException When the tablename is not allowed
+   * @throws SQLException       When the prepareStatment cannot be evaluated
+   */
+  private void generatePreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    // input: event
+    // wanted: INSERT INTO test4321 ( randomString, randomValue ) VALUES ( ?,? );
+    checkConnected();
+    parameters.clear();
+    StringBuilder statement1 = new StringBuilder("INSERT INTO ");
+    StringBuilder statement2 = new StringBuilder("VALUES ( ");
+    checkRegEx(tableName, "Tablename");
+    checkRegEx(schemaName, "Tablename");
+    statement1.append(schemaName).append(".");
+    statement1.append(tableName).append(" ( ");
+
+    // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
+    extendPreparedStatement(event, statement1, statement2, 1);
+
+    statement1.append(" ) ");
+    statement2.append(" );");
+    String finalStatement = statement1.append(statement2).toString();
+    ps = c.prepareStatement(finalStatement);
+  }
+
+  private int extendPreparedStatement(final Map<String, Object> event,
+                                      StringBuilder s1, StringBuilder s2, int index) throws SpRuntimeException {
+    return extendPreparedStatement(event, s1, s2, index, "", "");
+  }
+
+  /**
+   * @param event
+   * @param s1
+   * @param s2
+   * @param index
+   * @param preProperty
+   * @param pre
+   * @return
+   * @throws SpRuntimeException
+   */
+  private int extendPreparedStatement(final Map<String, Object> event,
+                                      StringBuilder s1, StringBuilder s2, int index, String preProperty, String pre)
+      throws SpRuntimeException {
+    checkConnected();
+    for (Map.Entry<String, Object> pair : event.entrySet()) {
+      if (pair.getValue() instanceof Map) {
+        index = extendPreparedStatement((Map<String, Object>) pair.getValue(), s1, s2, index,
+            pair.getKey() + "_", pre);
+      } else {
+        checkRegEx(pair.getKey(), "Columnname");
+        parameters.put(pair.getKey(), new Parameterinfo(index, SqlAttribute.getFromObject(pair.getValue())));
+        s1.append(pre).append("\"").append(preProperty).append(pair.getKey()).append("\"");
+        s2.append(pre).append("?");
+        index++;
+      }
+      pre = ", ";
+    }
+    return index;
+  }
+
+  /**
+   * Creates a schema with the name {@link PostgresJdbcClient#schemaName}
+   *
+   * @throws SpRuntimeException If the {@link PostgresJdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
+   */
+  protected void createSchema() throws SpRuntimeException {
+    checkConnected();
+    checkRegEx(tableName, "Tablename");
+
+    StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+    statement.append(schemaName).append(";");
+    try {
+      st.executeUpdate(statement.toString());
+    } catch (SQLException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+  /**
+   * Creates a table with the name {@link PostgresJdbcClient#tableName} and the
+   * properties {@link PostgresJdbcClient#eventProperties}. Calls
+   * {@link PostgresJdbcClient#extractEventProperties(List)} internally with the
+   * {@link PostgresJdbcClient#eventProperties} to extract all possible columns.
+   *
+   * @throws SpRuntimeException If the {@link PostgresJdbcClient#tableName}  is not allowed, if
+   *                            executeUpdate throws an SQLException or if {@link PostgresJdbcClient#extractEventProperties(List)}
+   *                            throws an exception
+   */
+  protected void createTable() throws SpRuntimeException {
+    checkConnected();
+    checkRegEx(tableName, "Tablename");
+
+    if (isToDropTable) {
+      StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+      statement.append(schemaName);
+      statement.append(".");
+      statement.append(tableName);
+      statement.append(";");
+
+      try {
+        st.executeUpdate(statement.toString());
+      } catch (SQLException e) {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+
+    StringBuilder statement = new StringBuilder("CREATE TABLE ");
+    statement.append(schemaName);
+    statement.append(".");
+    statement.append(tableName).append(" ( ");
+    statement.append(extractEventProperties(eventProperties)).append(" );");
+
+    try {
+      st.executeUpdate(statement.toString());
+    } catch (SQLException e) {
+      e.getErrorCode();
+      if (e.getSQLState().equals("42P07")) {
+        throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+      } else {
+        throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
+   * {@link PostgresJdbcClient#extractEventProperties(List, String)} with an empty string
+   *
+   * @param properties The list of properties which should be included in the query
+   * @return A StringBuilder with the query which needs to be executed in order to create the table
+   * @throws SpRuntimeException See {@link PostgresJdbcClient#extractEventProperties(List, String)} for details
+   */
+  private StringBuilder extractEventProperties(List<EventProperty> properties)
+      throws SpRuntimeException {
+    return extractEventProperties(properties, "");
+  }
+
+  /**
+   * Creates a SQL-Query with the given Properties (SQL-Injection safe). For nested properties it
+   * recursively extracts the information. EventPropertyList are getting converted to a string (so
+   * in SQL to a VARCHAR(255)). For each type it uses {@link SqlAttribute#getFromUri(String)}
+   * internally to identify the SQL-type from the runtimeType.
+   *
+   * @param properties  The list of properties which should be included in the query
+   * @param preProperty A string which gets prepended to all property runtimeNames
+   * @return A StringBuilder with the query which needs to be executed in order to create the table
+   * @throws SpRuntimeException If the runtimeName of any property is not allowed
+   */
+  private StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+      throws SpRuntimeException {
+    // output: "randomString VARCHAR(255), randomValue INT"
+    StringBuilder s = new StringBuilder();
+    String pre = "";
+    for (EventProperty property : properties) {
+      // Protection against SqlInjection
+
+      checkRegEx(property.getRuntimeName(), "Column name");
+      if (property instanceof EventPropertyNested) {
+        // if it is a nested property, recursively extract the needed properties
+        StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+            preProperty + property.getRuntimeName() + "_");
+        if (tmp.length() > 0) {
+          s.append(pre).append(tmp);
+        }
+      } else {
+        // Adding the name of the property (e.g. "randomString")
+        // Or for properties in a nested structure: input1_randomValue
+        // "pre" is there for the ", " part
+        s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
+
+        // adding the type of the property (e.g. "VARCHAR(255)")
+        if (property instanceof EventPropertyPrimitive) {
+          s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+        } else {
+          // Must be an EventPropertyList then
+          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
+        }
+      }
+      pre = ", ";
+    }
+
+    return s;
+  }
+
+  /**
+   * Checks if the input string is allowed (regEx match and length > 0)
+   *
+   * @param input           String which is getting matched with the regEx
+   * @param regExIdentifier Information about the use of the input. Gets included in the exception message
+   * @throws SpRuntimeException If {@code input} does not match with {@link PostgresJdbcClient#allowedRegEx}
+   *                            or if the length of {@code input} is 0
+   */
+  protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {
+    if (!input.matches(allowedRegEx) || input.length() == 0) {
+      throw new SpRuntimeException(regExIdentifier + " '" + input
+          + "' not allowed (allowed: '" + allowedRegEx + "') with a min length of 1");
+    }
+  }
+
+  protected void validateTable() throws SpRuntimeException {
+    //TODO: Add validation of an existing table
+    if (false) {
+      throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
+    }
+  }
+
+  /**
+   * Closes all open connections and statements of JDBC
+   */
+  protected void closeAll() {
+    boolean error = false;
+    try {
+      if (st != null) {
+        st.close();
+        st = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the statement: " + e.getMessage());
+    }
+    try {
+      if (c != null) {
+        c.close();
+        c = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the connection: " + e.getMessage());
+    }
+    try {
+      if (ps != null) {
+        ps.close();
+        ps = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the prepared statement: " + e.getMessage());
+    }
+    if (!error) {
+      logger.info("Shutdown all connections successfully.");
+    }
+  }
+
+  protected void checkConnected() throws SpRuntimeException {
+    if (c == null) {
+      throw new SpRuntimeException("Connection is not established.");
+    }
+  }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
new file mode 100644
index 0000000..ca76166
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/SpGeofencelDatabase.java
@@ -0,0 +1,197 @@
+package org.apache.streampipes.processors.geo.jvm.jts.geofence;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.locationtech.jts.geom.Geometry;
+
+import java.sql.*;
+import java.time.LocalDateTime;
+
+public class SpGeofencelDatabase extends PostgresJdbcClient {
+
+  private String host;
+  private Integer port;
+  private String dbname;
+  protected String user;
+  protected String password;
+  private String schema;
+  private String driver;
+  private String urlname;
+  private String url;
+
+  protected String allowedRegEx;
+  protected Logger logger;
+  protected String geofenceName;
+
+
+  // DB Tables
+  protected final String geofenceMainTable = "geofences";
+  private final String PG_COLUMN_ID = "id";
+  private final String PG_COLUMN_CREATED = "created_at";
+  private final String PG_COLUMN_UPDATED = "updated_at";
+  private final String PG_COLUMN_GEOFENCENAME = "geofencename";
+  private final String PG_COLUMN_GEOMETRY = "geom";
+
+
+  public SpGeofencelDatabase() {
+    super();
+  }
+
+  protected void initializeJdbc(String allowedRegEx,
+                                String geofenceName,
+                                Logger logger) throws SpRuntimeException {
+    this.allowedRegEx = allowedRegEx;
+    this.logger = logger;
+    this.geofenceName = geofenceName;
+
+    this.host = "localhost";
+    this.port = 54321;
+    this.dbname = "geo_streampipes";
+    this.user = "geo_streampipes";
+    this.password = "bQgu\"FUR_VH6z>~j";
+    this.schema = "geofence";
+    this.driver = "org.postgresql.Driver";
+    this.urlname = "postgresql";
+    this.url = "jdbc:" + urlname + "://" + host + ":" + port + "/";
+
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      throw new SpRuntimeException("Driver '" + driver + "' not found.");
+    }
+
+    connect();
+  }
+
+  protected void connect() throws SpRuntimeException {
+    try {
+      c = DriverManager.getConnection(url, user, password);
+      ensureSchemaExists(url, dbname);
+      createGeofenceTable();
+      createGeofenceTables();
+
+    } catch (SQLException e) {
+      throw new SpRuntimeException("PostGIS Service is not implemented: " + e.getMessage());
+    }
+  }
+
+  private boolean createGeofenceTable() throws SpRuntimeException {
+    boolean returnValue = false;
+    try {
+      c = DriverManager.getConnection(url + dbname, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getTables(null, null, geofenceMainTable, null);
+      while (rs.next()) {
+        // same table names can exists in different schmemas
+        if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+
+          StringBuilder statement = new StringBuilder("CREATE TABLE ");
+          statement.append(schemaName);
+          statement.append(".");
+          statement.append(geofenceMainTable).append(" ( ");
+          statement.append(PG_COLUMN_ID + " SERIAL PRIMARY KEY,");
+          statement.append(PG_COLUMN_CREATED + " TIMESTAMP,");
+          statement.append(PG_COLUMN_UPDATED + " TIMESTAMP,");
+          statement.append(PG_COLUMN_GEOFENCENAME + " TEXT NOT NULL UNIQUE");
+          statement.append(PG_COLUMN_GEOMETRY + " GEOMETRY,");
+          statement.append(" );");
+
+          try {
+            st.executeUpdate(statement.toString());
+            returnValue = true;
+          } catch (SQLException e) {
+            throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+          }
+        }
+      }
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+
+    return returnValue;
+  }
+
+
+  public void createGeofenceTables() throws SpRuntimeException {
+    try {
+      c = DriverManager.getConnection(url + dbname, user, password);
+      st = c.createStatement();
+
+      LocalDateTime settime = LocalDateTime.now();
+
+
+      StringBuilder statement = new StringBuilder("INSERT INTO ");
+      statement.append(schemaName);
+      statement.append(".");
+      statement.append(geofenceMainTable);
+      statement.append(" (" + PG_COLUMN_CREATED + ", " + PG_COLUMN_GEOFENCENAME + ")");
+      statement.append(" VALUES ('" + settime + "' , '" + geofenceName + "');");
+
+      st.executeUpdate(statement.toString());
+
+    } catch (SQLException e1) {
+      // Unique-Constraint validation try with recursive solution
+      closeAll();
+      throw new SpRuntimeException("Geofence Name is already taken. Please try another name");
+    }
+  }
+
+
+  public boolean updateGeofenceTable(Geometry geom) throws SpRuntimeException {
+    boolean result;
+
+    LocalDateTime settime = LocalDateTime.now();
+
+    try {
+      c = DriverManager.getConnection(url + dbname, user, password);
+      st = c.createStatement();
+
+      StringBuilder statement = new StringBuilder("UPDATE ");
+      statement.append(schemaName);
+      statement.append(".");
+      statement.append(geofenceMainTable);
+      statement.append(" SET ");
+      statement.append(PG_COLUMN_UPDATED).append(" = ").append(settime).append(",");
+      statement.append(PG_COLUMN_GEOMETRY).append(" = ");
+      statement.append("ST_GeomFromText('").append(geom.toText()).append("' ,").append(geom.getSRID()).append(")");
+      statement.append("WHERE name = ").append(geofenceName);
+      statement.append(";");
+
+      st.executeUpdate(statement.toString());
+      result = true;
+
+    } catch (SQLException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+
+    return result;
+  }
+
+
+  public boolean deleteTableEntry() throws SpRuntimeException {
+    boolean result;
+
+    try {
+      c = DriverManager.getConnection(url + dbname, user, password);
+      st = c.createStatement();
+
+
+      StringBuilder statement = new StringBuilder("DELETE FROM ");
+      statement.append(schemaName);
+      statement.append(".");
+      statement.append(geofenceMainTable);
+      statement.append(" WHERE ");
+      statement.append(PG_COLUMN_GEOFENCENAME).append(" = '").append(geofenceName).append("';");
+
+      st.executeUpdate(statement.toString());
+      result = true;
+
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+    return result;
+  }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
new file mode 100755
index 0000000..c345a0c
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofence.java
@@ -0,0 +1,51 @@
+package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
+
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.locationtech.jts.geom.Geometry;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.runtime.EventSink;
+import org.apache.streampipes.processors.geo.jvm.jts.geofence.SpGeofencelDatabase;
+
+
+public class StoreGeofence extends SpGeofencelDatabase implements EventSink<StoreGeofenceParameters> {
+
+  private static Logger LOG;
+  public StoreGeofenceParameters storeGeofenceParameters;
+
+  private String geofenceName;
+  private String geom_wkt;
+  private String epsg_code;
+
+
+  @Override
+  public void onInvocation(StoreGeofenceParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+
+    LOG = parameters.getGraph().getLogger(StoreGeofenceParameters.class);
+    this.geom_wkt = parameters.getWkt();
+    this.epsg_code = parameters.getEpsg();
+    this.geofenceName = parameters.getGeofenceName();
+
+    initializeJdbc("^[a-zA-Z_][a-zA-Z0-9_]*$", geofenceName, LOG);
+  }
+
+
+  @Override
+  public void onEvent(Event event) throws SpRuntimeException {
+
+    String wkt = event.getFieldBySelector(geom_wkt).getAsPrimitive().getAsString();
+    Integer epsg = event.getFieldBySelector(this.epsg_code).getAsPrimitive().getAsInt();
+    Geometry geometry =  SpGeometryBuilder.createSPGeom(wkt, epsg);
+    updateGeofenceTable(geometry);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    deleteTableEntry();
+    closeAll();
+  }
+
+
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
new file mode 100755
index 0000000..1df30c8
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceController.java
@@ -0,0 +1,59 @@
+package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
+
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+
+public class StoreGeofenceController extends StandaloneEventSinkDeclarer<StoreGeofenceParameters> {
+
+  protected final static String GEOMETRY_KEY = "geometry-key";
+  protected final static String EPSG_KEY = "epsg-key";
+  protected final static String GEOFENCE_KEY = "geofence-key";
+
+  @Override
+  public DataSinkDescription declareModel() {
+    return DataSinkBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.geofence.storing")
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .category(DataSinkType.STORAGE)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
+                Labels.withId(GEOMETRY_KEY), PropertyScope.MEASUREMENT_PROPERTY
+            )
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+                Labels.withId(EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY
+            )
+            .build()
+        )
+        .requiredTextParameter(Labels.withId(GEOFENCE_KEY)
+        )
+        .supportedFormats(SupportedFormats.jsonFormat())
+        .supportedProtocols(SupportedProtocols.kafka(), SupportedProtocols.jms())
+        .build();
+  }
+
+  @Override
+  public ConfiguredEventSink<StoreGeofenceParameters> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
+
+    String geofenceName = extractor.singleValueParameter(GEOFENCE_KEY, String.class);
+    String geom_wkt = extractor.mappingPropertyValue(GEOMETRY_KEY);
+    String epsg = extractor.mappingPropertyValue(EPSG_KEY);
+
+    StoreGeofenceParameters params = new StoreGeofenceParameters(graph, geofenceName, geom_wkt, epsg);
+
+    return new ConfiguredEventSink<>(params, StoreGeofence::new);
+  }
+
+
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
new file mode 100755
index 0000000..332a8c6
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/geofence/storing/StoreGeofenceParameters.java
@@ -0,0 +1,34 @@
+package org.apache.streampipes.processors.geo.jvm.jts.geofence.storing;
+
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+
+public class StoreGeofenceParameters extends EventSinkBindingParams {
+
+
+  private String geofenceName;
+  private String wkt;
+  private String epsg;
+
+  public StoreGeofenceParameters(DataSinkInvocation graph, String geofenceName, String wkt, String epsg) {
+    super(graph);
+
+    this.geofenceName = geofenceName;
+    this.wkt = wkt;
+    this.epsg = epsg;
+
+  }
+
+  public String getGeofenceName() {
+    return geofenceName;
+  }
+
+  public String getWkt() {
+    return wkt;
+  }
+
+  public String getEpsg() {
+    return epsg;
+  }
+
+}
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
new file mode 100644
index 0000000..1fbec38
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/documentation.md
@@ -0,0 +1,77 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Trajectory from JTS Point
+
+<p align="center">
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+This processor creates a JTS LineString geometry from  JTS Points events, represent a trajectory. A trajectory is defined  as the path that a moving object follows through space as a function of time. Each sub-point of this LineString represents a single event. The latest sub-point represents the latest geo-event. For each Point event it is also possible to store an additional m-value representing for example actually speed, distance, duration or direction of this event. A trajectory con [...]
+***
+
+## Required input
+
+*  WKT String of a JTS Point Geometry
+*  Integer value representing EPSG code
+*  Number value for M-value
+
+
+***
+
+## Configuration
+
+Creates a JTS Geometry LineString from a JTS Point Geometries events representing a trajectory.
+
+
+### 1st parameter
+Point WKT String
+
+### 2nd parameter
+EPSG code value
+
+### 3rd parameter
+M-value for each sub-point of the trajectory
+
+### 4rd parameter
+String for a description text for the trajectory
+
+### 5rd parameter
+Number of allowed sub-points
+
+***
+
+## Output
+
+Adds a LineString geometry in the Well Known Text to the event, representing a trajectory. Also the description text is added to the event stream. The first existing event creates an empty LineString.
+
+### Example
+Creating a LineString with a threshold of 2 allowed sub-points:
+
+* First Event:
+  * Point(8.12 41.23) --> LineString <empty>
+* Second Event:
+  * Point(8.56 41.25) --> LineString(8.12 41.23, 8.56 41.25)
+* Second Event:
+  * Point(8.84 40.98) --> LineString(8.56 41.25, 8.84 40.98)
+
+M-value is not represented in the LineString but will be stored for internal use!
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png
new file mode 100644
index 0000000..7389006
Binary files /dev/null and b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/icon.png differ
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
new file mode 100644
index 0000000..6dba4e1
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.geofence.storing/strings.en
@@ -0,0 +1,20 @@
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.title=Single Trajectory Creator
+org.apache.streampipes.processors.geo.jvm.jts.geofence.storing.description=Creates a trajectory from JTS point events
+
+point-key.title=JTS Point Event
+point-key.description=Single Point Event which will be added to the trajectory
+
+point-key.title=JTS Point Event
+point-key.description=Single Point Event which will be added to the trajectory
+
+epsg-key.title=CRS of Input Point
+epsg-key.description=EPSG-Code of input point
+
+m-key.title=measurement value
+m-key.description=Measurement value which will be stored with each point event
+
+description-key.title=description text of trajectory
+description-key.description=A description text for the trajectory
+
+subpoints-key.title=number of allowed sub-points
+subpoints-key.description=amount of allowed sub-points, creating the trajectory
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
index fccc723..2f7125e 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -211,7 +211,7 @@ public class PostgresJdbcClient {
      * @throws SpRuntimeException When the connection could not be established (because of a
      *                            wrong identification, missing database etc.)
      */
-    private void connect(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
+    protected void connect(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
 		String url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
         try {
             c = DriverManager.getConnection(url, user, password);