You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/06/06 15:53:19 UTC

[incubator-streampipes-extensions] 01/01: jdbc client for extended, added Postgresql sink and PostgresjdbcClient

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/jdbc_rewrite
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 9e279c78b88daa908704cd62f6c5d913635c15e5
Author: micklich <fl...@disy.net>
AuthorDate: Sat Jun 6 17:52:56 2020 +0200

    jdbc client for extended, added Postgresql sink and PostgresjdbcClient
---
 notes.txt                                          | 117 +++
 .../sinks/databases/jvm/iotdb/IotDb.java           |  18 +-
 .../sinks/databases/jvm/jdbcclient/JdbcClient.java | 929 +++++++++------------
 .../databases/jvm/jdbcclient/Parameterinfo.java    |  33 +
 .../databases/jvm/jdbcclient/SqlAttribute.java     | 124 +++
 .../sinks/databases/jvm/mysql/Mysql.java           |   8 +-
 .../sinks/databases/jvm/postgresql/PostgreSql.java |   7 +-
 .../jvm/postgresql/PostgreSqlController.java       |  24 +-
 .../jvm/postgresql/PostgreSqlParameters.java       |  10 +-
 .../jvm/postgresql/PostgresJdbcClient.java         | 367 ++++++++
 10 files changed, 1100 insertions(+), 537 deletions(-)

diff --git a/notes.txt b/notes.txt
new file mode 100644
index 0000000..8d29bc5
--- /dev/null
+++ b/notes.txt
@@ -0,0 +1,117 @@
+    //========== Testing
+
+    Double lat_ka = 49.00689;
+    Double lng_ka = 8.403653;
+
+    Double lat_NY = 40.730610;
+    Double lng_NY = -73.935242;
+
+    SpLengthCalculator tester1 = new SpLengthCalculator(10);
+
+    tester1.calcGeodesicDistance(lat_ka, lng_ka, lat_NY, lng_NY);
+    tester1.convertUnit(SpLengthCalculator.ValidLengthUnits.KM);
+
+
+
+    double result_ka_ny = tester1.getLengthValue();
+
+    Double lat_ra = 48.8591174;
+    Double lng_ra = 8.2059096;
+
+    SpLengthCalculator tester2 = new SpLengthCalculator(10);
+
+    tester2.calcGeodesicDistance(lat_ka, lng_ka, lat_ra, lng_ra);
+    tester2.convertUnit(SpLengthCalculator.ValidLengthUnits.KM);
+
+    double result_ka_ra = tester2.getLengthValue();
+
+
+
+
+
+
+
+
+//  private boolean createInfoGeofenceTable(String url, String databaseName) throws SpRuntimeException {
+//    boolean returnValue = false;
+//
+//    try {
+//      // Database should exist by now so we can establish a connection
+//      c = DriverManager.getConnection(url + databaseName, user, password);
+//      st = c.createStatement();
+//      ResultSet rs = c.getMetaData().getTables(null, null, geofenceInfoTable, null);
+//      while (rs.next()) {
+//        // same table names can exists in different schmemas
+//        if (!rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+//
+//          StringBuilder statement = new StringBuilder("CREATE TABLE ");
+//          statement.append(schemaName);
+//          statement.append(".");
+//          statement.append(geofenceInfoTable).append(" ( ");
+//          statement.append("name TEXT NOT NULL UNIQUE,");
+//          statement.append("geom GEOMETRY,");
+//          statement.append("FOREIGN KEY (name) REFERENCES )");
+//          statement.append(schema + "." + geofenceMainTable);
+//          statement.append("(name) ON UPDATE CASCADE ON DELETE CASCADE");
+//          statement.append(" );");
+//
+//          try {
+//            st.executeUpdate(statement.toString());
+//            returnValue = true;
+//          } catch (SQLException e) {
+//            e.getErrorCode();
+//            if (e.getSQLState().equals("42P07")) {
+//              throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+//            }
+//          }
+//        }
+//      }
+//    } catch (SQLException e) {
+//      closeAll();
+//      throw new SpRuntimeException(e.getMessage());
+//    }
+//    return returnValue;
+//  }
+
+
+//  @Override
+//  protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
+//    try {
+//      // Database should exist by now so we can establish a connection
+//      c = DriverManager.getConnection(url + databaseName, user, password);
+//      st = c.createStatement();
+//      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+//      while (rs.next()) {
+//        // same table names can exists in different schmemas
+//        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+//          if (isToDropTable) {
+//            createTable();
+//          }
+//          validateTable();
+//        } else {
+//          createTable();
+//        }
+//      }
+//      tableExists = true;
+//      rs.close();
+//    } catch (SQLException e) {
+//      closeAll();
+//      throw new SpRuntimeException(e.getMessage());
+//    }
+//  }
+
+
+
+
+
+
+
+INSERT INTO geofence.geofences (created_at, geofencename) VALUES ('2020-06-01T11:09:09.456' , 'blubber');
+
+
+CREATE TABLE geofence.geofences ( id SERIAL PRIMARY KEY,created_at TIMESTAMP,updated_at TIMESTAMP,geofencename TEXT NOT NULL UNIQUEgeom GEOMETRY);
+
+
+UPDATE geofence.geofences SET updated_at = 2020-06-01T11:46:14.038,geom = ST_GeomFromText('POINT (11.7689 -21.3234)' ,4326)WHERE name = blubber;
+
+UPDATE geofence.geofences SET updated_at = '2020-06-01T11:58:21.276',geom = ST_GeomFromText('POINT (39.3197 15.4693)' ,4326) WHERE geofencename = blubber;
\ No newline at end of file
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 177270a..a21e672 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -24,6 +24,8 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.Parameterinfo;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
 import org.apache.streampipes.vocabulary.XSD;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -110,27 +112,15 @@ public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
     }
   }
 
-  @Override
-  protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
-    checkRegEx(tableName, "Storage Group name");
-    try {
-      Statement statement = c.createStatement();
-      statement.execute("SET STORAGE GROUP TO " + tableName);
-    } catch (SQLException e) {
-      // Storage group already exists
-      //TODO: Catch other exceptions
-    }
-  }
+
 
   /**
    * Needs to be reimplemented since the IoTDB JDBC implementation does not support the methods used in the
    * JDBC-Client class
-   *
-   * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
    * @throws SpRuntimeException
    */
   @Override
-  protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
+  protected void ensureTableExists() throws SpRuntimeException {
     int index = 1;
     parameters.put("timestamp", new Parameterinfo(index++, SqlAttribute.LONG));
     for (EventProperty eventProperty : eventProperties) {
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
index 71a357f..ee67bac 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/JdbcClient.java
@@ -33,543 +33,446 @@ import java.util.Map;
 
 
 public class JdbcClient {
-    private String allowedRegEx;
-
-    protected String tableName;
-    protected String user;
-    protected String password;
-
-    protected boolean tableExists = false;
-
-    protected Logger logger;
-
-    protected Connection c = null;
-    protected Statement st = null;
-    protected PreparedStatement ps = null;
-
-    /**
-     * The list of properties extracted from the graph
-     */
-    protected List<EventProperty> eventProperties;
-    /**
-     * The parameters in the prepared statement {@code ps} together with their index and data type
-     */
-    protected HashMap<String, Parameterinfo> parameters = new HashMap<>();
-
-    /**
-     * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
-     * If no matching type is found, it is interpreted as a String (VARCHAR(255))
-     */
-    protected enum SqlAttribute {
-        INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
-        BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
-        private final String sqlName;
-
-        SqlAttribute(String s) {
-            sqlName = s;
-        }
-
-        /**
-         * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
-         * interpreted as a String (VARCHAR(255))
-         *
-         * @param o The object which should be identified
-         * @return An {@link SqlAttribute} of the identified type
-         */
-        public static SqlAttribute getFromObject(final Object o) {
-            SqlAttribute r;
-            if (o instanceof Integer) {
-                r = SqlAttribute.INTEGER;
-            } else if (o instanceof Long) {
-                r = SqlAttribute.LONG;
-            } else if (o instanceof Float) {
-                r = SqlAttribute.FLOAT;
-            } else if (o instanceof Double) {
-                r = SqlAttribute.DOUBLE;
-            } else if (o instanceof Boolean) {
-                r = SqlAttribute.BOOLEAN;
-            } else {
-                r = SqlAttribute.STRING;
-            }
-            return r;
-        }
-
-        public static SqlAttribute getFromUri(final String s) {
-            SqlAttribute r;
-            if (s.equals(XSD._integer.toString())) {
-                r = SqlAttribute.INTEGER;
-            } else if (s.equals(XSD._long.toString())) {
-                r = SqlAttribute.LONG;
-            } else if (s.equals(XSD._float.toString())) {
-                r = SqlAttribute.FLOAT;
-            } else if (s.equals(XSD._double.toString())) {
-                r = SqlAttribute.DOUBLE;
-            } else if (s.equals(XSD._boolean.toString())) {
-                r = SqlAttribute.BOOLEAN;
-            } else {
-                r = SqlAttribute.STRING;
-            }
-            return r;
-        }
-
-        /**
-         * Sets the value in the prepardStatement {@code ps}
-         *
-         * @param p     The needed info about the parameter (index and type)
-         * @param value The value of the object, which should be filled in the
-         * @param ps    The prepared statement, which will be filled
-         * @throws SpRuntimeException When the data type in {@code p} is unknown
-         * @throws SQLException       When the setters of the statement throw an
-         *                            exception (e.g. {@code setInt()})
-         */
-        public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
-                throws SQLException, SpRuntimeException {
-            switch (p.type) {
-                case INTEGER:
-                    ps.setInt(p.index, (Integer) value);
-                    break;
-                case LONG:
-                    ps.setLong(p.index, (Long) value);
-                    break;
-                case FLOAT:
-                    ps.setFloat(p.index, (Float) value);
-                    break;
-                case DOUBLE:
-                    ps.setDouble(p.index, (Double) value);
-                    break;
-                case BOOLEAN:
-                    ps.setBoolean(p.index, (Boolean) value);
-                    break;
-                case STRING:
-                    ps.setString(p.index, value.toString());
-                    break;
-                default:
-                    throw new SpRuntimeException("Unknown SQL datatype");
-            }
-        }
-
-        @Override
-        public String toString() {
-            return sqlName;
-        }
+  protected String allowedRegEx;
+  protected String url;
+
+  protected String databaseName;
+  protected String tableName;
+  protected String user;
+  protected String password;
+
+  protected boolean tableExists = false;
+
+  protected Logger logger;
+
+  protected Connection c = null;
+  protected Statement st = null;
+  protected PreparedStatement ps = null;
+
+  /**
+   * The list of properties extracted from the graph
+   */
+  protected List<EventProperty> eventProperties;
+  /**
+   * The parameters in the prepared statement {@code ps} together with their index and data type
+   */
+  protected HashMap<String, Parameterinfo> parameters = new HashMap<>();
+
+  /**
+   * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+   * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+   */
+
+
+  public JdbcClient() {
+  }
+
+  protected void initializeJdbc(List<EventProperty> eventProperties,
+                                String host,
+                                Integer port,
+                                String databaseName,
+                                String tableName,
+                                String user,
+                                String password,
+                                String allowedRegEx,
+                                String driver,
+                                String urlName,
+                                Logger logger) throws SpRuntimeException {
+
+    this.eventProperties = eventProperties;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.user = user;
+    this.password = password;
+    this.allowedRegEx = allowedRegEx;
+    this.logger = logger;
+
+    this.url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      throw new SpRuntimeException("Driver '" + driver + "' not found.");
     }
 
-    /**
-     * Contains all information needed to "fill" a prepared statement (index and the data type)
-     */
-    protected static class Parameterinfo {
-        private int index;
-        private SqlAttribute type;
-
-        public Parameterinfo(final int index, final SqlAttribute type) {
-            this.index = index;
-            this.type = type;
-        }
+    connect();
+    ensureDatabaseExists();
+    ensureTableExists();
+  }
+
+  /**
+   * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
+   * {@link JdbcClient#st}
+   *
+   * @throws SpRuntimeException When the connection could not be established (because of a
+   *                            wrong identification, missing database etc.)
+   */
+  protected void connect() throws SpRuntimeException {
+    try {
+      c = DriverManager.getConnection(url, user, password);
+    } catch (SQLException e) {
+      // host or port is wrong -- Class 08  Connection Exception
+      if (e.getSQLState().substring(0, 2).equals("08")) {
+        throw new SpRuntimeException("Connection can't be established. Check host or port setting: \n" + e.getMessage());
+      }
+      // username or password is wrong -- Class 28  Invalid Authorization Specification
+      else if (e.getSQLState().substring(0, 2).equals("28")) {
+        throw new SpRuntimeException("User authentication error. Check username or password: \n" + e.getMessage());
+      } else {
+        throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
+      }
     }
-
-
-    public JdbcClient() {
+  }
+
+  /**
+   * If this method returns successfully a database with the given name exists on the server, specified by the url.
+   *
+   * @throws SpRuntimeException If the database does not exists and could not be created
+   */
+  protected void ensureDatabaseExists() throws SpRuntimeException {
+    checkRegEx(databaseName, "databasename");
+
+    try {
+      // Checks whether the database already exists (using catalogs has not worked with postgres)
+      st = c.createStatement();
+      st.executeUpdate("CREATE DATABASE " + databaseName + ";");
+      logger.info("Created new database '" + databaseName + "'");
+    } catch (SQLException e1) {
+      if (!e1.getSQLState().substring(0, 2).equals("42")) {
+        throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
+      }
     }
-
-    protected void initializeJdbc(List<EventProperty> eventProperties,
-                                  String host,
-                                  Integer port,
-                                  String databaseName,
-                                  String tableName,
-                                  String user,
-                                  String password,
-                                  String allowedRegEx,
-                                  String driver,
-                                  String urlName,
-                                  Logger logger) throws SpRuntimeException {
-        this.tableName = tableName;
-        this.user = user;
-        this.password = password;
-        this.allowedRegEx = allowedRegEx;
-        this.logger = logger;
-        this.eventProperties = eventProperties;
-        try {
-            Class.forName(driver);
-        } catch (ClassNotFoundException e) {
-            throw new SpRuntimeException("Driver '" + driver + "' not found.");
-        }
-
-        connect(host, port, urlName, databaseName);
+    closeAll();
+  }
+
+  /**
+   * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureTableExists() throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+      if (rs.next()) {
+        validateTable();
+      } else {
+        createTable();
+      }
+      tableExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
     }
-
-
-    /**
-     * Connects to the HadoopFileSystem Server and initilizes {@link JdbcClient#c} and
-     * {@link JdbcClient#st}
-     *
-     * @throws SpRuntimeException When the connection could not be established (because of a
-     *                            wrong identification, missing database etc.)
-     */
-    private void connect(String host, int port, String urlName, String databaseName) throws SpRuntimeException {
-		String url = "jdbc:" + urlName + "://" + host + ":" + port + "/";
-        try {
-            c = DriverManager.getConnection(url, user, password);
-            ensureDatabaseExists(url, databaseName);
-            ensureTableExists(url, databaseName);
-        } catch (SQLException e) {
-            throw new SpRuntimeException("Could not establish a connection with the server: " + e.getMessage());
-        }
+  }
+
+  /**
+   * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
+   * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
+   *
+   * @param event Data to be saved in the SQL table
+   * @throws SQLException       When the statement cannot be executed
+   * @throws SpRuntimeException When the table name is not allowed or it is thrown
+   *                            by {@link SqlAttribute#setValue(Parameterinfo, Object, PreparedStatement)}
+   */
+  protected void executePreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    checkConnected();
+    if (ps != null) {
+      ps.clearParameters();
     }
-
-	/**
-	 * If this method returns successfully a database with the given name exists on the server, specified by the url.
-	 *
-	 * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
-	 * @param databaseName The name of the database that should exist
-	 * @throws SpRuntimeException If the database does not exists and could not be created
-	 */
-	protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
-		checkRegEx(databaseName, "databasename");
-
-		try {
-			// Checks whether the database already exists (using catalogs has not worked with postgres)
-			st = c.createStatement();
-			st.executeUpdate("CREATE DATABASE " + databaseName + ";");
-			logger.info("Created new database '" + databaseName + "'");
-		} catch (SQLException e1) {
-			if (!e1.getSQLState().substring(0, 2).equals("42")) {
-				throw new SpRuntimeException("Error while creating database: " + e1.getMessage());
-			}
-		}
-		closeAll();
-	}
-
-	/**
-	 * If this method returns successfully a table with the name in {@link JdbcClient#tableName} exists in the database
-	 * with the given database name exists on the server, specified by the url.
-	 *
-	 * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
-	 * @param databaseName The database in which the table should exist
-	 * @throws SpRuntimeException If the table does not exist and could not be created
-	 */
-	protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
-		try {
-			// Database should exist by now so we can establish a connection
-			c = DriverManager.getConnection(url + databaseName, user, password);
-			st = c.createStatement();
-			ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
-			if (rs.next()) {
-				validateTable();
-			} else {
-				createTable();
-			}
-			tableExists = true;
-			rs.close();
-		} catch (SQLException e) {
-			closeAll();
-			throw new SpRuntimeException(e.getMessage());
-		}
-	}
-
-    /**
-     * Clears, fills and executes the saved prepared statement {@code ps} with the data found in
-     * event. To fill in the values it calls {@link JdbcClient#fillPreparedStatement(Map)}.
-     *
-     * @param event Data to be saved in the SQL table
-     * @throws SQLException       When the statement cannot be executed
-     * @throws SpRuntimeException When the table name is not allowed or it is thrown
-     *                            by {@link SqlAttribute#setValue(Parameterinfo, Object, PreparedStatement)}
-     */
-    private void executePreparedStatement(final Map<String, Object> event)
-            throws SQLException, SpRuntimeException {
-        checkConnected();
-        if (ps != null) {
-            ps.clearParameters();
-        }
-        fillPreparedStatement(event);
-        ps.executeUpdate();
+    fillPreparedStatement(event);
+    ps.executeUpdate();
+  }
+
+  /**
+   * Prepares a statement for the insertion of values or the
+   *
+   * @param event The event which should be saved to the Postgres table
+   * @throws SpRuntimeException When there was an error in the saving process
+   */
+  protected void save(final Event event) throws SpRuntimeException {
+    //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
+    checkConnected();
+    Map<String, Object> eventMap = event.getRaw();
+    if (event == null) {
+      throw new SpRuntimeException("event is null");
     }
-
-	/**
-	 * Prepares a statement for the insertion of values or the
-	 *
-	 * @param event The event which should be saved to the Postgres table
-	 * @throws SpRuntimeException When there was an error in the saving process
-	 */
-	protected void save(final Event event) throws SpRuntimeException {
-		//TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
-		checkConnected();
-		Map<String, Object> eventMap = event.getRaw();
-		if (event == null) {
-			throw new SpRuntimeException("event is null");
-		}
-		if (!tableExists) {
-			// Creates the table
-			createTable();
-			tableExists = true;
-		}
-		try {
-			executePreparedStatement(eventMap);
-		} catch (SQLException e) {
-			if (e.getSQLState().substring(0, 2).equals("42")) {
-				// If the table does not exists (because it got deleted or something, will cause the error
-				// code "42") we will try to create a new one. Otherwise we do not handle the exception.
-				logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
-				tableExists = false;
-				createTable();
-				tableExists = true;
-
-				try {
-					executePreparedStatement(eventMap);
-				} catch (SQLException e1) {
-					throw new SpRuntimeException(e1.getMessage());
-				}
-			} else {
-				throw new SpRuntimeException(e.getMessage());
-			}
-		}
-	}
-
-    private void fillPreparedStatement(final Map<String, Object> event)
-            throws SQLException, SpRuntimeException {
-        fillPreparedStatement(event, "");
+    if (!tableExists) {
+      // Creates the table
+      createTable();
+      tableExists = true;
     }
+    try {
+      executePreparedStatement(eventMap);
+    } catch (SQLException e) {
+      if (e.getSQLState().substring(0, 2).equals("42")) {
+        // If the table does not exists (because it got deleted or something, will cause the error
+        // code "42") we will try to create a new one. Otherwise we do not handle the exception.
+        logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
+        tableExists = false;
+        createTable();
+        tableExists = true;
 
-    /**
-     * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
-     * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
-     * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
-     *
-     * @param event
-     * @param pre
-     * @throws SQLException
-     * @throws SpRuntimeException
-     */
-    private void fillPreparedStatement(final Map<String, Object> event, String pre)
-            throws SQLException, SpRuntimeException {
-        // checkConnected();
-        //TODO: Possible error: when the event does not contain all objects of the parameter list
-        for (Map.Entry<String, Object> pair : event.entrySet()) {
-            String newKey = pre + pair.getKey();
-            if (pair.getValue() instanceof Map) {
-                // recursively extracts nested values
-                fillPreparedStatement((Map<String, Object>) pair.getValue(), newKey + "_");
-            } else {
-                if (!parameters.containsKey(newKey)) {
-                    //TODO: start the for loop all over again
-                    generatePreparedStatement(event);
-                }
-                Parameterinfo p = parameters.get(newKey);
-                SqlAttribute.setValue(p, pair.getValue(), ps);
-            }
+        try {
+          executePreparedStatement(eventMap);
+        } catch (SQLException e1) {
+          throw new SpRuntimeException(e1.getMessage());
         }
+      } else {
+        throw new SpRuntimeException(e.getMessage());
+      }
     }
-
-    /**
-     * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
-     * according to the parameter event.
-     *
-     * @param event The event which is getting analyzed
-     * @throws SpRuntimeException When the tablename is not allowed
-     * @throws SQLException       When the prepareStatment cannot be evaluated
-     */
-    private void generatePreparedStatement(final Map<String, Object> event)
-            throws SQLException, SpRuntimeException {
-        // input: event
-        // wanted: INSERT INTO test4321 ( randomString, randomValue ) VALUES ( ?,? );
-        checkConnected();
-        parameters.clear();
-        StringBuilder statement1 = new StringBuilder("INSERT INTO ");
-        StringBuilder statement2 = new StringBuilder("VALUES ( ");
-        checkRegEx(tableName, "Tablename");
-        statement1.append(tableName).append(" ( ");
-
-        // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
-        extendPreparedStatement(event, statement1, statement2, 1);
-
-        statement1.append(" ) ");
-        statement2.append(" );");
-        String finalStatement = statement1.append(statement2).toString();
-        ps = c.prepareStatement(finalStatement);
+  }
+
+  protected void fillPreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    fillPreparedStatement(event, "");
+  }
+
+  /**
+   * Fills a prepared statement with the actual values base on {@link JdbcClient#parameters}. If
+   * {@link JdbcClient#parameters} is empty or not complete (which should only happen once in the
+   * begining), it calls {@link JdbcClient#generatePreparedStatement(Map)} to generate a new one.
+   *
+   * @param event
+   * @param pre
+   * @throws SQLException
+   * @throws SpRuntimeException
+   */
+  protected void fillPreparedStatement(final Map<String, Object> event, String pre)
+      throws SQLException, SpRuntimeException {
+    // checkConnected();
+    //TODO: Possible error: when the event does not contain all objects of the parameter list
+    for (Map.Entry<String, Object> pair : event.entrySet()) {
+      String newKey = pre + pair.getKey();
+      if (pair.getValue() instanceof Map) {
+        // recursively extracts nested values
+        fillPreparedStatement((Map<String, Object>) pair.getValue(), newKey + "_");
+      } else {
+        if (!parameters.containsKey(newKey)) {
+          //TODO: start the for loop all over again
+          generatePreparedStatement(event);
+        }
+        Parameterinfo p = parameters.get(newKey);
+        SqlAttribute.setValue(p, pair.getValue(), ps);
+      }
     }
-
-    private int extendPreparedStatement(final Map<String, Object> event,
+  }
+
+  /**
+   * Initializes the variables {@link JdbcClient#parameters} and {@link JdbcClient#ps}
+   * according to the parameter event.
+   *
+   * @param event The event which is getting analyzed
+   * @throws SpRuntimeException When the tablename is not allowed
+   * @throws SQLException       When the prepareStatment cannot be evaluated
+   */
+  protected void generatePreparedStatement(final Map<String, Object> event)
+      throws SQLException, SpRuntimeException {
+    // input: event
+    // wanted: INSERT INTO test4321 ( randomString, randomValue ) VALUES ( ?,? );
+    checkConnected();
+    parameters.clear();
+    StringBuilder statement1 = new StringBuilder("INSERT INTO ");
+    StringBuilder statement2 = new StringBuilder("VALUES ( ");
+    checkRegEx(tableName, "Tablename");
+    statement1.append(tableName).append(" ( ");
+
+    // Starts index at 1, since the parameterIndex in the PreparedStatement starts at 1 as well
+    extendPreparedStatement(event, statement1, statement2, 1);
+
+    statement1.append(" ) ");
+    statement2.append(" );");
+    String finalStatement = statement1.append(statement2).toString();
+    ps = c.prepareStatement(finalStatement);
+  }
+
+  protected int extendPreparedStatement(final Map<String, Object> event,
                                         StringBuilder s1, StringBuilder s2, int index) throws SpRuntimeException {
-        return extendPreparedStatement(event, s1, s2, index, "", "");
-    }
-
-    /**
-     * @param event
-     * @param s1
-     * @param s2
-     * @param index
-     * @param preProperty
-     * @param pre
-     * @return
-     * @throws SpRuntimeException
-     */
-    private int extendPreparedStatement(final Map<String, Object> event,
+    return extendPreparedStatement(event, s1, s2, index, "", "");
+  }
+
+  /**
+   * @param event
+   * @param s1
+   * @param s2
+   * @param index
+   * @param preProperty
+   * @param pre
+   * @return
+   * @throws SpRuntimeException
+   */
+  protected int extendPreparedStatement(final Map<String, Object> event,
                                         StringBuilder s1, StringBuilder s2, int index, String preProperty, String pre)
-            throws SpRuntimeException {
-        checkConnected();
-        for (Map.Entry<String, Object> pair : event.entrySet()) {
-            if (pair.getValue() instanceof Map) {
-                index = extendPreparedStatement((Map<String, Object>) pair.getValue(), s1, s2, index,
-                        pair.getKey() + "_", pre);
-            } else {
-                checkRegEx(pair.getKey(), "Columnname");
-                parameters.put(pair.getKey(), new Parameterinfo(index, SqlAttribute.getFromObject(pair.getValue())));
-                s1.append(pre).append("\"").append(preProperty).append(pair.getKey()).append("\"");
-                s2.append(pre).append("?");
-                index++;
-            }
-            pre = ", ";
-        }
-        return index;
+      throws SpRuntimeException {
+    checkConnected();
+    for (Map.Entry<String, Object> pair : event.entrySet()) {
+      if (pair.getValue() instanceof Map) {
+        index = extendPreparedStatement((Map<String, Object>) pair.getValue(), s1, s2, index,
+            pair.getKey() + "_", pre);
+      } else {
+        checkRegEx(pair.getKey(), "Columnname");
+        parameters.put(pair.getKey(), new Parameterinfo(index, SqlAttribute.getFromObject(pair.getValue())));
+        s1.append(pre).append("\"").append(preProperty).append(pair.getKey()).append("\"");
+        s2.append(pre).append("?");
+        index++;
+      }
+      pre = ", ";
     }
-
-    /**
-     * Creates a table with the name {@link JdbcClient#tableName} and the
-     * properties {@link JdbcClient#eventProperties}. Calls
-     * {@link JdbcClient#extractEventProperties(List)} internally with the
-     * {@link JdbcClient#eventProperties} to extract all possible columns.
-     *
-     * @throws SpRuntimeException If the {@link JdbcClient#tableName}  is not allowed, if
-     *                            executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
-     *                            throws an exception
-     */
-	protected void createTable() throws SpRuntimeException {
-        checkConnected();
-        checkRegEx(tableName, "Tablename");
-
-        StringBuilder statement = new StringBuilder("CREATE TABLE \"");
-        statement.append(tableName).append("\" ( ");
-        statement.append(extractEventProperties(eventProperties)).append(" );");
-
-        try {
-            st.executeUpdate(statement.toString());
-        } catch (SQLException e) {
-            throw new SpRuntimeException(e.getMessage());
+    return index;
+  }
+
+  /**
+   * Creates a table with the name {@link JdbcClient#tableName} and the
+   * properties {@link JdbcClient#eventProperties}. Calls
+   * {@link JdbcClient#extractEventProperties(List)} internally with the
+   * {@link JdbcClient#eventProperties} to extract all possible columns.
+   *
+   * @throws SpRuntimeException If the {@link JdbcClient#tableName}  is not allowed, if
+   *                            executeUpdate throws an SQLException or if {@link JdbcClient#extractEventProperties(List)}
+   *                            throws an exception
+   */
+  protected void createTable() throws SpRuntimeException {
+    checkConnected();
+    checkRegEx(tableName, "Tablename");
+
+    StringBuilder statement = new StringBuilder("CREATE TABLE \"");
+    statement.append(tableName).append("\" ( ");
+    statement.append(extractEventProperties(eventProperties)).append(" );");
+
+    try {
+      st.executeUpdate(statement.toString());
+    } catch (SQLException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+  /**
+   * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
+   * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
+   *
+   * @param properties The list of properties which should be included in the query
+   * @return A StringBuilder with the query which needs to be executed in order to create the table
+   * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
+   */
+  protected StringBuilder extractEventProperties(List<EventProperty> properties)
+      throws SpRuntimeException {
+    return extractEventProperties(properties, "");
+  }
+
+  /**
+   * Creates a SQL-Query with the given Properties (SQL-Injection safe). For nested properties it
+   * recursively extracts the information. EventPropertyList are getting converted to a string (so
+   * in SQL to a VARCHAR(255)). For each type it uses {@link SqlAttribute#getFromUri(String)}
+   * internally to identify the SQL-type from the runtimeType.
+   *
+   * @param properties  The list of properties which should be included in the query
+   * @param preProperty A string which gets prepended to all property runtimeNames
+   * @return A StringBuilder with the query which needs to be executed in order to create the table
+   * @throws SpRuntimeException If the runtimeName of any property is not allowed
+   */
+  protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+      throws SpRuntimeException {
+    // output: "randomString VARCHAR(255), randomValue INT"
+    StringBuilder s = new StringBuilder();
+    String pre = "";
+    for (EventProperty property : properties) {
+      // Protection against SqlInjection
+
+      checkRegEx(property.getRuntimeName(), "Column name");
+      if (property instanceof EventPropertyNested) {
+        // if it is a nested property, recursively extract the needed properties
+        StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
+            preProperty + property.getRuntimeName() + "_");
+        if (tmp.length() > 0) {
+          s.append(pre).append(tmp);
+        }
+      } else {
+        // Adding the name of the property (e.g. "randomString")
+        // Or for properties in a nested structure: input1_randomValue
+        // "pre" is there for the ", " part
+        s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
+
+        // adding the type of the property (e.g. "VARCHAR(255)")
+        if (property instanceof EventPropertyPrimitive) {
+          s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
+        } else {
+          // Must be an EventPropertyList then
+          s.append(SqlAttribute.getFromUri(XSD._string.toString()));
         }
+      }
+      pre = ", ";
     }
 
-    /**
-     * Creates a SQL-Query with the given Properties (SQL-Injection safe). Calls
-     * {@link JdbcClient#extractEventProperties(List, String)} with an empty string
-     *
-     * @param properties The list of properties which should be included in the query
-     * @return A StringBuilder with the query which needs to be executed in order to create the table
-     * @throws SpRuntimeException See {@link JdbcClient#extractEventProperties(List, String)} for details
-     */
-    private StringBuilder extractEventProperties(List<EventProperty> properties)
-            throws SpRuntimeException {
-        return extractEventProperties(properties, "");
+    return s;
+  }
+
+  /**
+   * Checks if the input string is allowed (regEx match and length > 0)
+   *
+   * @param input           String which is getting matched with the regEx
+   * @param regExIdentifier Information about the use of the input. Gets included in the exception message
+   * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
+   *                            or if the length of {@code input} is 0
+   */
+  protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {
+    if (!input.matches(allowedRegEx) || input.length() == 0) {
+      throw new SpRuntimeException(regExIdentifier + " '" + input
+          + "' not allowed (allowed: '" + allowedRegEx + "') with a min length of 1");
     }
+  }
 
-    /**
-     * Creates a SQL-Query with the given Properties (SQL-Injection safe). For nested properties it
-     * recursively extracts the information. EventPropertyList are getting converted to a string (so
-     * in SQL to a VARCHAR(255)). For each type it uses {@link SqlAttribute#getFromUri(String)}
-     * internally to identify the SQL-type from the runtimeType.
-     *
-     * @param properties  The list of properties which should be included in the query
-     * @param preProperty A string which gets prepended to all property runtimeNames
-     * @return A StringBuilder with the query which needs to be executed in order to create the table
-     * @throws SpRuntimeException If the runtimeName of any property is not allowed
-     */
-    private StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
-            throws SpRuntimeException {
-        // output: "randomString VARCHAR(255), randomValue INT"
-        StringBuilder s = new StringBuilder();
-        String pre = "";
-        for (EventProperty property : properties) {
-            // Protection against SqlInjection
-
-            checkRegEx(property.getRuntimeName(), "Column name");
-            if (property instanceof EventPropertyNested) {
-                // if it is a nested property, recursively extract the needed properties
-                StringBuilder tmp = extractEventProperties(((EventPropertyNested) property).getEventProperties(),
-                        preProperty + property.getRuntimeName() + "_");
-                if (tmp.length() > 0) {
-                    s.append(pre).append(tmp);
-                }
-            } else {
-                // Adding the name of the property (e.g. "randomString")
-                // Or for properties in a nested structure: input1_randomValue
-                // "pre" is there for the ", " part
-                s.append(pre).append("\"").append(preProperty).append(property.getRuntimeName()).append("\" ");
-
-                // adding the type of the property (e.g. "VARCHAR(255)")
-                if (property instanceof EventPropertyPrimitive) {
-                    s.append(SqlAttribute.getFromUri(((EventPropertyPrimitive) property).getRuntimeType()));
-                } else {
-                    // Must be an EventPropertyList then
-                    s.append(SqlAttribute.getFromUri(XSD._string.toString()));
-                }
-            }
-            pre = ", ";
-        }
-
-        return s;
+  protected void validateTable() throws SpRuntimeException {
+    //TODO: Add validation of an existing table
+    if (false) {
+      throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
     }
-
-    /**
-     * Checks if the input string is allowed (regEx match and length > 0)
-     *
-     * @param input String which is getting matched with the regEx
-     * @param regExIdentifier Information about the use of the input. Gets included in the exception message
-     * @throws SpRuntimeException If {@code input} does not match with {@link JdbcClient#allowedRegEx}
-     *                            or if the length of {@code input} is 0
-     */
-    protected final void checkRegEx(String input, String regExIdentifier) throws SpRuntimeException {
-        if (!input.matches(allowedRegEx) || input.length() == 0) {
-            throw new SpRuntimeException(regExIdentifier + " '" + input
-                    + "' not allowed (allowed: '" + allowedRegEx + "') with a min length of 1");
-        }
+  }
+
+  /**
+   * Closes all open connections and statements of JDBC
+   */
+  protected void closeAll() {
+    boolean error = false;
+    try {
+      if (st != null) {
+        st.close();
+        st = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the statement: " + e.getMessage());
     }
-
-    protected void validateTable() throws SpRuntimeException {
-        //TODO: Add validation of an existing table
-        if (false) {
-            throw new SpRuntimeException("Table '" + tableName + "' does not match the eventproperties");
-        }
+    try {
+      if (c != null) {
+        c.close();
+        c = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the connection: " + e.getMessage());
     }
-
-    /**
-     * Closes all open connections and statements of JDBC
-     */
-    protected void closeAll() {
-        boolean error = false;
-        try {
-            if (st != null) {
-                st.close();
-                st = null;
-            }
-        } catch (SQLException e) {
-            error = true;
-            logger.warn("Exception when closing the statement: " + e.getMessage());
-        }
-        try {
-            if (c != null) {
-                c.close();
-                c = null;
-            }
-        } catch (SQLException e) {
-            error = true;
-            logger.warn("Exception when closing the connection: " + e.getMessage());
-        }
-        try {
-            if (ps != null) {
-                ps.close();
-                ps = null;
-            }
-        } catch (SQLException e) {
-            error = true;
-            logger.warn("Exception when closing the prepared statement: " + e.getMessage());
-        }
-        if (!error) {
-            logger.info("Shutdown all connections successfully.");
-        }
+    try {
+      if (ps != null) {
+        ps.close();
+        ps = null;
+      }
+    } catch (SQLException e) {
+      error = true;
+      logger.warn("Exception when closing the prepared statement: " + e.getMessage());
+    }
+    if (!error) {
+      logger.info("Shutdown all connections successfully.");
     }
+  }
 
-    protected void checkConnected() throws SpRuntimeException {
-        if (c == null) {
-            throw new SpRuntimeException("Connection is not established.");
-        }
+  protected void checkConnected() throws SpRuntimeException {
+    if (c == null) {
+      throw new SpRuntimeException("Connection is not established.");
     }
+  }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java
new file mode 100644
index 0000000..74fc329
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/Parameterinfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+
+/**
+ * Contains all information needed to "fill" a prepared statement (index and the data type)
+ */
+public class Parameterinfo {
+  protected int index;
+  protected SqlAttribute type;
+
+  public Parameterinfo(final int index, final SqlAttribute type) {
+    this.index = index;
+    this.type = type;
+  }
+}
+
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
new file mode 100644
index 0000000..88f0059
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/jdbcclient/SqlAttribute.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.jdbcclient;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+
+/**
+ * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+ * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+ */
+public enum SqlAttribute {
+  INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE"), STRING("VARCHAR(255)"),
+  BOOLEAN("BOOLEAN"), DATETIME("DATETIME");
+  private final String sqlName;
+
+  SqlAttribute(String s) {
+    sqlName = s;
+  }
+
+  /**
+   * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
+   * interpreted as a String (VARCHAR(255))
+   *
+   * @param o The object which should be identified
+   * @return An {@link SqlAttribute} of the identified type
+   */
+  public static SqlAttribute getFromObject(final Object o) {
+    SqlAttribute r;
+    if (o instanceof Integer) {
+      r = SqlAttribute.INTEGER;
+    } else if (o instanceof Long) {
+      r = SqlAttribute.LONG;
+    } else if (o instanceof Float) {
+      r = SqlAttribute.FLOAT;
+    } else if (o instanceof Double) {
+      r = SqlAttribute.DOUBLE;
+    } else if (o instanceof Boolean) {
+      r = SqlAttribute.BOOLEAN;
+    } else {
+      r = SqlAttribute.STRING;
+    }
+    return r;
+  }
+
+  public static SqlAttribute getFromUri(final String s) {
+    SqlAttribute r;
+    if (s.equals(XSD._integer.toString())) {
+      r = SqlAttribute.INTEGER;
+    } else if (s.equals(XSD._long.toString())) {
+      r = SqlAttribute.LONG;
+    } else if (s.equals(XSD._float.toString())) {
+      r = SqlAttribute.FLOAT;
+    } else if (s.equals(XSD._double.toString())) {
+      r = SqlAttribute.DOUBLE;
+    } else if (s.equals(XSD._boolean.toString())) {
+      r = SqlAttribute.BOOLEAN;
+    } else {
+      r = SqlAttribute.STRING;
+    }
+    return r;
+  }
+
+  /**
+   * Sets the value in the prepardStatement {@code ps}
+   *
+   * @param p     The needed info about the parameter (index and type)
+   * @param value The value of the object, which should be filled in the
+   * @param ps    The prepared statement, which will be filled
+   * @throws SpRuntimeException When the data type in {@code p} is unknown
+   * @throws SQLException       When the setters of the statement throw an
+   *                            exception (e.g. {@code setInt()})
+   */
+  public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
+      throws SQLException, SpRuntimeException {
+    switch (p.type) {
+      case INTEGER:
+        ps.setInt(p.index, (Integer) value);
+        break;
+      case LONG:
+        ps.setLong(p.index, (Long) value);
+        break;
+      case FLOAT:
+        ps.setFloat(p.index, (Float) value);
+        break;
+      case DOUBLE:
+        ps.setDouble(p.index, (Double) value);
+        break;
+      case BOOLEAN:
+        ps.setBoolean(p.index, (Boolean) value);
+        break;
+      case STRING:
+        ps.setString(p.index, value.toString());
+        break;
+      default:
+        throw new SpRuntimeException("Unknown SQL datatype");
+    }
+  }
+
+  @Override
+  public String toString() {
+    return sqlName;
+  }
+}
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
index 7009970..95c3538 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/mysql/Mysql.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.schema.EventPropertyNested;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.SqlAttribute;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -32,8 +33,6 @@ import org.apache.streampipes.model.schema.EventProperty;
 
 import java.sql.*;
 import java.util.*;
-import java.util.Objects;
-
 
 public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
 
@@ -80,7 +79,6 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
     }
 
 
-    @Override
     protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
         checkRegEx(databaseName, "databasename");
         try {
@@ -218,13 +216,13 @@ public class Mysql extends JdbcClient implements EventSink<MysqlParameters> {
     }
 
 
-    private StringBuilder extractEventProperties(List<EventProperty> properties)
+    protected StringBuilder extractEventProperties(List<EventProperty> properties)
             throws SpRuntimeException {
         return extractEventProperties(properties, "");
     }
 
 
-    private StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
+    protected StringBuilder extractEventProperties(List<EventProperty> properties, String preProperty)
             throws SpRuntimeException {
 
         StringBuilder s = new StringBuilder();
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
index 68ae54d..c8e52aa 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSql.java
@@ -21,11 +21,10 @@ package org.apache.streampipes.sinks.databases.jvm.postgresql;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
-public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParameters> {
+public class PostgreSql extends PostgresJdbcClient implements EventSink<PostgreSqlParameters> {
 
   private static Logger LOG;
 
@@ -47,7 +46,9 @@ public class PostgreSql extends JdbcClient implements EventSink<PostgreSqlParame
             "^[a-zA-Z_][a-zA-Z0-9_]*$",
             "org.postgresql.Driver",
             "postgresql",
-            LOG);
+            LOG,
+            parameters.getSchemaName(),
+            parameters.isToDropTable());
   }
 
   @Override
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
index b7a5838..356bab6 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlController.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
@@ -38,7 +39,12 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   private static final String DATABASE_NAME_KEY = "db_name";
   private static final String DATABASE_TABLE_KEY = "db_table";
   private static final String DATABASE_USER_KEY = "db_user";
+  private static final String DATABASE_SCHEMA_KEY = "db_schema";
   private static final String DATABASE_PASSWORD_KEY = "db_password";
+  private static final String DATABASE_REPLACETABLE_KEY = "db_replaceTable";
+
+  private static final String YES = "Yes";
+  private static final String NO = "No";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -54,7 +60,11 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             .requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_TABLE_KEY))
             .requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
+            .requiredTextParameter(Labels.withId(DATABASE_SCHEMA_KEY))
             .requiredSecret(Labels.withId(DATABASE_PASSWORD_KEY))
+            .requiredSingleValueSelection(
+                Labels.withId(DATABASE_REPLACETABLE_KEY),
+                Options.from(YES, NO))
             .build();
   }
 
@@ -62,12 +72,22 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
   public ConfiguredEventSink<PostgreSqlParameters> onInvocation(DataSinkInvocation graph,
                                                                 DataSinkParameterExtractor extractor) {
 
+
+
     String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
     String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
     String tableName = extractor.singleValueParameter(DATABASE_TABLE_KEY, String.class);
     String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
     String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
+    String schemaName = extractor.singleValueParameter(DATABASE_SCHEMA_KEY, String.class);
+    String valueOption = extractor.selectedSingleValue(DATABASE_REPLACETABLE_KEY, String.class);
+
+
+    boolean IsToDropTable = false;
+    if (valueOption.equals(YES)) {
+      IsToDropTable = true;
+    }
 
     PostgreSqlParameters params = new PostgreSqlParameters(graph,
             hostname,
@@ -75,7 +95,9 @@ public class PostgreSqlController extends StandaloneEventSinkDeclarer<PostgreSql
             dbName,
             tableName,
             user,
-            password);
+            password,
+            schemaName,
+            IsToDropTable);
 
     return new ConfiguredEventSink<>(params, PostgreSql::new);
   }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
index d45dfdb..2e50fdd 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgreSqlParameters.java
@@ -29,8 +29,10 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   private String tableName;
   private String user;
   private String password;
+  private String schemaName;
+  private boolean isToDropTable;
 
-  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password) {
+  public PostgreSqlParameters(DataSinkInvocation graph, String PostgreSqlHost, Integer PostgreSqlPort, String databaseName, String tableName, String user, String password, String schemaName, boolean dropTable) {
     super(graph);
     this.PostgreSqlHost = PostgreSqlHost;
     this.PostgreSqlPort = PostgreSqlPort;
@@ -38,6 +40,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
     this.tableName = tableName;
     this.user = user;
     this.password = password;
+    this.schemaName = schemaName;
+    this.isToDropTable = dropTable;
   }
 
   public String getPostgreSqlHost() {
@@ -63,4 +67,8 @@ public class PostgreSqlParameters extends EventSinkBindingParams {
   public String getPassword() {
     return password;
   }
+
+  public String getSchemaName() { return schemaName; }
+
+  public boolean isToDropTable() { return isToDropTable; }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
new file mode 100644
index 0000000..a70bdf4
--- /dev/null
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql/PostgresJdbcClient.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.postgresql;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class PostgresJdbcClient extends JdbcClient {
+
+  protected String schemaName;
+  protected boolean schemaExists = false;
+  protected boolean isToDropTable = false;
+
+//  /**
+//   * A wrapper class for all supported SQL data types (INT, BIGINT, FLOAT, DOUBLE, VARCHAR(255)).
+//   * If no matching type is found, it is interpreted as a String (VARCHAR(255))
+//   */
+  protected enum SqlAttribute {
+  INTEGER("INT"), LONG("BIGINT"), FLOAT("FLOAT"), DOUBLE("DOUBLE PRECISION"), STRING("TEXT"), BOOLEAN("BOOLEAN");
+  private final String sqlName;
+
+  SqlAttribute(String s) {
+    sqlName = s;
+  }
+}
+//
+//    /**
+//     * Tries to identify the data type of the object {@code o}. In case it is not supported, it is
+//     * interpreted as a String (VARCHAR(255))
+//     *
+//     * @param o The object which should be identified
+//     * @return An {@link SqlAttribute} of the identified type
+//     */
+//    public static SqlAttribute getFromObject(final Object o) {
+//      SqlAttribute r;
+//      if (o instanceof Integer) {
+//        r = SqlAttribute.INTEGER;
+//      } else if (o instanceof Long) {
+//        r = SqlAttribute.LONG;
+//      } else if (o instanceof Float) {
+//        r = SqlAttribute.FLOAT;
+//      } else if (o instanceof Double) {
+//        r = SqlAttribute.DOUBLE;
+//      } else if (o instanceof Boolean) {
+//        r = SqlAttribute.BOOLEAN;
+//      } else {
+//        r = SqlAttribute.STRING;
+//      }
+//      return r;
+//    }
+//
+//    public static SqlAttribute getFromUri(final String s) {
+//      SqlAttribute r;
+//      if (s.equals(XSD._integer.toString())) {
+//        r = SqlAttribute.INTEGER;
+//      } else if (s.equals(XSD._long.toString())) {
+//        r = SqlAttribute.LONG;
+//      } else if (s.equals(XSD._float.toString())) {
+//        r = SqlAttribute.FLOAT;
+//      } else if (s.equals(XSD._double.toString())) {
+//        r = SqlAttribute.DOUBLE;
+//      } else if (s.equals(XSD._boolean.toString())) {
+//        r = SqlAttribute.BOOLEAN;
+//      } else {
+//        r = SqlAttribute.STRING;
+//      }
+//      return r;
+//    }
+//
+//    /**
+//     * Sets the value in the prepardStatement {@code ps}
+//     *
+//     * @param p     The needed info about the parameter (index and type)
+//     * @param value The value of the object, which should be filled in the
+//     * @param ps    The prepared statement, which will be filled
+//     * @throws SpRuntimeException When the data type in {@code p} is unknown
+//     * @throws SQLException       When the setters of the statement throw an
+//     *                            exception (e.g. {@code setInt()})
+//     */
+//    public static void setValue(Parameterinfo p, Object value, PreparedStatement ps)
+//        throws SQLException, SpRuntimeException {
+//      switch (p.type) {
+//        case INTEGER:
+//          ps.setInt(p.index, (Integer) value);
+//          break;
+//        case LONG:
+//          ps.setLong(p.index, (Long) value);
+//          break;
+//        case FLOAT:
+//          ps.setFloat(p.index, (Float) value);
+//          break;
+//        case DOUBLE:
+//          ps.setDouble(p.index, (Double) value);
+//          break;
+//        case BOOLEAN:
+//          ps.setBoolean(p.index, (Boolean) value);
+//          break;
+//        case STRING:
+//          ps.setString(p.index, value.toString());
+//          break;
+//        default:
+//          throw new SpRuntimeException("Unknown SQL datatype");
+//      }
+//    }
+//
+//    @Override
+//    public String toString() {
+//      return sqlName;
+//    }
+//  }
+
+//  /**
+//   * Contains all information needed to "fill" a prepared statement (index and the data type)
+//   */
+//  protected static class Parameterinfo {
+//    private int index;
+//    private SqlAttribute type;
+//
+//    public Parameterinfo(final int index, final SqlAttribute type) {
+//      this.index = index;
+//      this.type = type;
+//    }
+//  }
+
+  public PostgresJdbcClient() {
+    super();
+  }
+
+  protected void initializeJdbc(List<EventProperty> eventProperties,
+                                String host, Integer port,
+                                String databaseName,
+                                String tableName,
+                                String user,
+                                String password,
+                                String allowedRegEx,
+                                String driver,
+                                String urlName,
+                                Logger logger,
+                                String schemaName,
+                                boolean isToDropTable) throws SpRuntimeException {
+    super.initializeJdbc(
+        eventProperties,
+        host,
+        port,
+        databaseName,
+        tableName,
+        user,
+        password,
+        allowedRegEx,
+        driver,
+        urlName,
+        logger);
+    this.schemaName = schemaName;
+    this.isToDropTable = isToDropTable;
+
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      throw new SpRuntimeException("Driver '" + driver + "' not found.");
+    }
+
+    connect();
+    ensureDatabaseExists();
+    ensureSchemaExists();
+    ensureTableExists();
+  }
+
+
+  /**
+   * If this method returns successfully a schema with the name in {@link PostgresJdbcClient#schemaName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  protected void ensureSchemaExists() throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getSchemas();
+
+      boolean isItExisting = false;
+      while (rs.next()) {
+        String schema = rs.getString("TABLE_SCHEM");
+        if (schema.toLowerCase().equals(schemaName.toLowerCase())) {
+          isItExisting = true;
+        }
+      }
+      if (!isItExisting) {
+        createSchema();
+      }
+
+      schemaExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+  /**
+   * If this method returns successfully a table with the name in {@link PostgresJdbcClient#tableName} exists in the database
+   * with the given database name exists on the server, specified by the url.
+   *
+   * @throws SpRuntimeException If the table does not exist and could not be created
+   */
+  @Override
+  protected void ensureTableExists() throws SpRuntimeException {
+    try {
+      // Database should exist by now so we can establish a connection
+      c = DriverManager.getConnection(url + databaseName, user, password);
+      st = c.createStatement();
+      ResultSet rs = c.getMetaData().getTables(null, null, tableName, null);
+      while (rs.next()) {
+        // same table names can exists in different schmemas
+        if (rs.getString("TABLE_SCHEM").toLowerCase().equals(schemaName.toLowerCase())) {
+          if (isToDropTable) {
+            createTable();
+          }
+          validateTable();
+        } else {
+          createTable();
+        }
+      }
+      tableExists = true;
+      rs.close();
+    } catch (SQLException e) {
+      closeAll();
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+
+  /**
+   * Prepares a statement for the insertion of values or the
+   *
+   * @param event The event which should be saved to the Postgres table
+   * @throws SpRuntimeException When there was an error in the saving process
+   */
+  @Override
+  protected void save(final Event event) throws SpRuntimeException {
+    //TODO: Add batch support (https://stackoverflow.com/questions/3784197/efficient-way-to-do-batch-inserts-with-jdbc)
+    checkConnected();
+    Map<String, Object> eventMap = event.getRaw();
+    if (event == null) {
+      throw new SpRuntimeException("event is null");
+    }
+
+    if (!schemaExists) {
+      // Creates the schema
+      createSchema();
+      schemaExists = true;
+    }
+
+    if (!tableExists) {
+      // Creates the table
+      createTable();
+      tableExists = true;
+    }
+    try {
+      executePreparedStatement(eventMap);
+    } catch (SQLException e) {
+      if (e.getSQLState().substring(0, 2).equals("42")) {
+        // If the table does not exists (because it got deleted or something, will cause the error
+        // code "42") we will try to create a new one. Otherwise we do not handle the exception.
+        logger.warn("Table '" + tableName + "' was unexpectedly not found and gets recreated.");
+        tableExists = false;
+        createTable();
+        tableExists = true;
+
+        try {
+          executePreparedStatement(eventMap);
+        } catch (SQLException e1) {
+          throw new SpRuntimeException(e1.getMessage());
+        }
+      } else {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+  }
+
+
+  /**
+   * Creates a schema with the name {@link PostgresJdbcClient#schemaName}
+   *
+   * @throws SpRuntimeException If the {@link PostgresJdbcClient#schemaName}  is not allowed, if executeUpdate throws an SQLException
+   */
+  protected void createSchema() throws SpRuntimeException {
+    checkConnected();
+    checkRegEx(tableName, "Tablename");
+
+    StringBuilder statement = new StringBuilder("CREATE SCHEMA ");
+    statement.append(schemaName).append(";");
+    try {
+      st.executeUpdate(statement.toString());
+    } catch (SQLException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  protected void createTable() throws SpRuntimeException {
+    checkConnected();
+    checkRegEx(tableName, "Tablename");
+
+    if (isToDropTable) {
+      StringBuilder statement = new StringBuilder("DROP TABLE IF EXISTS ");
+      statement.append(schemaName);
+      statement.append(".");
+      statement.append(tableName);
+      statement.append(";");
+
+      try {
+        st.executeUpdate(statement.toString());
+      } catch (SQLException e) {
+        throw new SpRuntimeException(e.getMessage());
+      }
+    }
+
+    StringBuilder statement = new StringBuilder("CREATE TABLE ");
+    statement.append(schemaName);
+    statement.append(".");
+    statement.append(tableName).append(" ( ");
+    statement.append(extractEventProperties(eventProperties)).append(" );");
+
+    try {
+      st.executeUpdate(statement.toString());
+    } catch (SQLException e) {
+      e.getErrorCode();
+      if (e.getSQLState().equals("42P07")) {
+        throw new SpRuntimeException("Table already exists. Change option \"DROP TABLE\" to prevent this error. Error Message: " + e.getMessage());
+      } else {
+        throw new SpRuntimeException("Something went wrong during table creation with error message: " + e.getMessage());
+      }
+    }
+  }
+}