You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streampipes.apache.org by Florian Micklich <fl...@disy.net.INVALID> on 2020/02/19 19:27:47 UTC

postgres sink

Hi all,


I was just using the postgres sink [0] and got an error.

I am using following docker container:

docker run --name "streampipes_postgis" -e POSTGRES_USER=streampipes -e POSTGRES_PASS=streampipes -e POSTGRES_DBNAME=streampipes -p 65432:5432 -d -t kartoza/postgis



The database is created and also the table. But saving the events in the DB is not working.

19:54:00.875 SP [Thread-2] WARN  o.a.s.s.d.jvm.postgresql.PostgreSql - USERLOG - correspondingPipeline: 839d7efd-d561-4731-80f9-343610fcdc5d - peURI: http://172.17.0.1:8005/sec/org.apache.streampipes.sinks.databases.jvm.postgresql/839d7efd-d561-4731-80f9-343610fcdc5d-org.streampipes.connect.ebf6c159-7576-4f7d-8e43-a79d4b5f8080-postgresql-0 - Table 'testtable' was unexpectedly not found and gets recreated.
19:54:00.880 SP [Thread-2] ERROR o.a.s.s.d.jvm.postgresql.PostgreSql - USERLOG - correspondingPipeline: 839d7efd-d561-4731-80f9-343610fcdc5d - peURI: http://172.17.0.1:8005/sec/org.apache.streampipes.sinks.databases.jvm.postgresql/839d7efd-d561-4731-80f9-343610fcdc5d-org.streampipes.connect.ebf6c159-7576-4f7d-8e43-a79d4b5f8080-postgresql-0 - ERROR: relation "testtable" already exists

The last message appears after every event is saved.

I had a quick look in the code but not able to find the reason so far. The code changes a lot compared to the last time I looked at it.




In the ensureDatabaseExists method in the jdbcClient I also saw a comment:

// Checks whether the database already exists (using catalogs has not worked with postgres)


If I use following query, I can check in postgres if a  database, table or even schema already exists. Maybe this is helpful???


String checkTableName = "SELECT EXISTS (SELECT table_name FROM information_schema.tables WHERE table_schema = '"+ schemaName + "' AND table_name = '"+tableName+"') AS result;";
String checkDatabaseName = "SELECT EXISTS (SELECT 1 FROM pg_database WHERE datname = '"+ databaseName + "') AS result;";
String checkSchemaName = "SELECT EXISTS (SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = '" + schemaName +"') AS result;";


I used this method:


    private boolean checkExistInPG(Connection conn, String query) {

        boolean exists = false;

        try (Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(query)){


            if(rs.next()) {
                exists = rs.getBoolean(1);
            }

        } catch (SQLException e) {

            throw new SpRuntimeException("Check if database, table or schema  exists went wrong: " + e.getSQLState() +"\n" + e.getMessage());
            //e.printStackTrace();
        } finally {
            return exists;
        }
    }



I would also like to start a discussion about extending the postgres sink.

Would it be a good idea to support the user input "db schema" as well? At the moment the table is only written in the public schema.

I saw that the  jdbcClient is also used for the iotdb? Would this be compatible? Is this also a postgres db?

I am asking because I am thinking to extend the Postgres with the PostGIS extension as well.


Sorry this email is longer than expected :-D


Kind regards

Florian

[0] https://github.com/apache/incubator-streampipes-extensions/tree/dev/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/postgresql


Disy Informationssysteme GmbH
Florian Micklich
Lösungsentwickler
+49 721 16006 477,  florian.micklich@disy.net

Firmensitz: Ludwig-Erhard-Allee 6, 76131 Karlsruhe
Registergericht: Amtsgericht Mannheim, HRB 107964
Geschäftsführer: Claus Hofmann

Bitte beachten Sie folgende Informationen für Kunden, Lieferanten und Bewerber
- Datenschutz: www.disy.net/datenschutz
- Informationspflichten:  www.disy.net/informationspflichten