You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/03 17:47:25 UTC

[camel-kafka-connector-examples] 01/01: Added a PgEvent Source connector example

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch pgevent-source
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit d0522521e859ba0f2ff37cd5f67286f9bef92d4a
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 3 18:46:57 2020 +0100

    Added a PgEvent Source connector example
---
 pgevent/pgevent-source/README.adoc                 | 196 +++++++++++++++++++++
 .../config/CamelPgeventSourceConnector.properties  |  31 ++++
 2 files changed, 227 insertions(+)

diff --git a/pgevent/pgevent-source/README.adoc b/pgevent/pgevent-source/README.adoc
new file mode 100644
index 0000000..f8ff5f0
--- /dev/null
+++ b/pgevent/pgevent-source/README.adoc
@@ -0,0 +1,196 @@
+= Camel-Kafka-connector PGEvent Source
+
+This is an example for Camel-Kafka-connector PGEvent Source
+
+== Standalone
+
+=== What is needed
+
+- An AWS Account
+- A running postgresql instance through docker
+
+=== Running Kafka
+
+[source]
+----
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+----
+
+=== Download the connector package
+
+Download the connector package zip and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/`
+
+[source]
+----
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-pgevent-kafka-connector/0.6.0/camel-pgevent-kafka-connector-0.6.0-package.zip
+> unzip camel-pgevent-kafka-connector-0.6.0-package.zip
+----
+
+=== Configuring Kafka Connect
+
+You'll need to set up the `plugin.path` property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location:
+
+[source]
+----
+...
+plugin.path=/home/oscerd/connectors
+...
+----
+
+=== Setup the docker image
+
+We'll need a full running Postgresql instance.
+
+First step is running it:
+
+[source]
+----
+> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
+6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
+----
+
+Take note of the container id.
+We need now to create the table we'll use: the table is the following
+
+[source]
+----
+CREATE TABLE accounts (
+	user_id serial PRIMARY KEY,
+	username VARCHAR ( 50 ) UNIQUE NOT NULL,
+	city VARCHAR ( 50 ) NOT NULL
+);
+----
+
+We are now ready to create the table
+
+[source]
+----
+> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
+psql (13.0 (Debian 13.0-1.pgdg100+1))
+Type "help" for help.
+
+postgres=# CREATE TABLE accounts (
+postgres(# user_id serial PRIMARY KEY,
+postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL,
+postgres(# city VARCHAR ( 50 ) NOT NULL
+postgres(# );
+----
+
+Now we need to create a trigger with a notify command for the channel we'll choose. In our case we'll call the channel like the table we created, so 'accounts'
+
+The Trigger is the following
+
+[source]
+----
+CREATE OR REPLACE FUNCTION row_inserted()
+  RETURNS trigger AS $$
+DECLARE
+BEGIN
+  PERFORM pg_notify(
+    'accounts',
+    row_to_json(NEW)::text);
+  RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER row_inserted
+  AFTER INSERT ON accounts
+  FOR EACH ROW
+  EXECUTE PROCEDURE row_inserted();
+----
+
+We need to execute it in our psql session
+
+[source]
+----
+> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
+psql (13.0 (Debian 13.0-1.pgdg100+1))
+Type "help" for help.
+
+postgres=# CREATE OR REPLACE FUNCTION row_inserted()
+postgres-#   RETURNS trigger AS $$
+postgres$# DECLARE
+postgres$# BEGIN
+postgres$#   PERFORM pg_notify(
+postgres$#     'accounts',
+postgres$#     row_to_json(NEW)::text);
+postgres$#   RETURN NEW;
+postgres$# END;
+postgres$# $$ LANGUAGE plpgsql;
+CREATE FUNCTION
+postgres=# 
+postgres=# CREATE TRIGGER row_inserted
+postgres-#   AFTER INSERT ON accounts
+postgres-#   FOR EACH ROW
+postgres-#   EXECUTE PROCEDURE row_inserted();
+CREATE TRIGGER
+----
+
+We need to take note also of the container ip
+
+----
+> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
+172.17.0.2
+----
+
+=== Setup the connectors
+
+Open the PGEvent configuration file at `$EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties`
+
+[source]
+----
+name=CamelPgeventSourceConnector
+connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.source.endpoint.user=postgres
+camel.source.endpoint.pass=mysecretpassword
+camel.source.path.port=5432
+camel.source.path.host=172.17.0.2
+
+camel.source.path.channel=accounts
+camel.source.path.database=postgres
+----
+
+and add the correct IP for the container.
+
+=== Running the example
+
+Run the kafka connect with the SQL Source connector:
+
+[source]
+----
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
+----
+
+Now we need to create run some insert in our database
+
+[source]
+----
+> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
+postgres=# insert into accounts(username,city) values('andrea','Roma');
+INSERT 0 1
+Asynchronous notification "accounts" with payload "{"user_id":1,"username":"andrea","city":"Roma"}" received from server process with PID 152.
+postgres=# insert into accounts(username,city) values('John','New York');
+INSERT 0 1
+Asynchronous notification "accounts" with payload "{"user_id":2,"username":"John","city":"New York"}" received from server process with PID 152.
+----
+
+On a different terminal run the kafkacat consumer
+
+[source]
+----
+> ./kafkacat -b  localhost:9092 -t mytopic
+{"user_id":1,"username":"andrea","city":"Roma"}
+{"user_id":2,"username":"John","city":"New York"}
+% Reached end of topic dbtest6 [0] at offset 2
+----
+
diff --git a/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties b/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
new file mode 100644
index 0000000..e172e28
--- /dev/null
+++ b/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelPgeventSourceConnector
+connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.source.endpoint.user=postgres
+camel.source.endpoint.pass=mysecretpassword
+camel.source.path.port=5432
+camel.source.path.host=172.17.0.2
+
+camel.source.path.channel=accounts
+camel.source.path.database=postgres