You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/18 00:08:12 UTC

[GitHub] [nifi] davyam opened a new pull request, #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

davyam opened a new pull request, #6053:
URL: https://github.com/apache/nifi/pull/6053

   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes from PostgreSQL tables via Logical Replication API
   
   Authors:
   Davy Machado machado.davy@gmail.com
   Gerdan Santos gerdan@gmail.com
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   The CaptureChangePostgreSQL processor retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. This processor uses a Logical Replication Connection to stream data and a SQL Connection to query system views.
   
   This new pull request builds upon the PR #5710.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [X] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [X] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [X] Have you written or updated unit tests to verify your changes?
   - [X] Have you verified that the full build is successful on JDK 8?
   - [X] Have you verified that the full build is successful on JDK 11?
   - [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [X] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [X] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [X] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [X] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] djshura2008 commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
djshura2008 commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1134184216

   > > I have a problem.
   > 
   > Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.
   
   Okay, but how do you take an initial snapshot of database?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] djshura2008 commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
djshura2008 commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1132542665

   I have a problem. I have a clean nifi running through docker,
   I use these settings for bootstrap.config
   # JVM memory settings
   java.arg.2=-Xms2048m
   java.arg.3=-Xmx4096m
   
   Initially, docker stat displays:
   CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS
   5017cf957d26 nifi_docker 0.60% 2.185GiB / 7.771GiB 28.12% 40.5 kB / 121kB 757MB / 995kB 109
   
   But as soon as I start the process for a table weighing 370 MB, docker stat displays:
   CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS
   5017cf957d26 nifi_docker 26.17% 4.985GiB / 7.771GiB 64.15% 1.15 GB / 26.4 MB 767MB / 1.01 MB 113
   
   And also I'm starting to get the error: Java heap space
   
   Вот настройки моего процесса:
   ![изображение](https://user-images.githubusercontent.com/62784748/169470210-5937d210-e3a0-451c-a79e-6931e3650b09.png)
   ![изображение](https://user-images.githubusercontent.com/62784748/169470504-dd15bf5e-7b5b-4a4c-acb9-5f8e07985f15.png)
   ![изображение](https://user-images.githubusercontent.com/62784748/169470702-0fcf175b-b0f8-43f4-b2b1-278bb204c170.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6053:
URL: https://github.com/apache/nifi/pull/6053#discussion_r875871477


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/pom.xml:
##########
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-postgresql-bundle</artifactId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cdc-postgresql-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   These dependencies can be removed since there are included by default in the root Maven configuration.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);

Review Comment:
   ```suggestion
                   String relationDefinition = new String(bytes_R, StandardCharsets.UTF_8);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");

Review Comment:
   Use of SimpleDateFormat should be replaced with DateTimeFormatter. This particular calculation should be done once and stored as a static value.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/test/java/org/apache/nifi/cdc/postgresql/processors/TestCaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.postgresql.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.cdc.postgresql.event.MockReader;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;

Review Comment:
   JUnit 5 is now the preferred version, can you make the adjustments to use the JUnit 5 classes?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());
+    }
+
+    /**
+     * Indicates whether the dataTypes field is empty.
+     *
+     * @return boolean
+     */
+    public boolean isDataTypesEmpty() {
+        if (this.dataTypes.isEmpty()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Loads the PostgreSQL data types set from pg_catalog.pg_type system view.
+     * These set are used to enrich messages with the data type name.
+     *
+     * @param queryConnection
+     *                        Connection to PostgreSQL database.
+     * @throws SQLException
+     *                      if fails to access PostgreSQL database
+     */
+    public void loadDataTypes(Connection queryConnection) throws SQLException {
+        Statement stmt = queryConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        ResultSet rs = stmt.executeQuery("SELECT oid, typname FROM pg_catalog.pg_type");
+
+        while (rs.next()) {
+            this.dataTypes.put(rs.getInt(1), rs.getString(2));
+        }
+
+        rs.close();
+        stmt.close();
+    }
+
+    /**
+     * Sets manually the data types set used to enrich messages.
+     *
+     * @param dataTypes
+     *                  A set with data type id and data type name.
+     */
+    public void setDataTypes(HashMap<Integer, String> dataTypes) {
+        this.dataTypes = dataTypes;
+    }

Review Comment:
   These methods for loading data types should be moved to a new class. The `setDataTypes()` can remain here, but the retrieval of data types from a JDBC Connection can be moved to a separate class.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];

Review Comment:
   Use of the underscore character should be avoided in variable names.
   ```suggestion
                       byte[] origin = new byte[buffer.remaining()];
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);

Review Comment:
   ```suggestion
                   int insertId = buffer.getInt(position);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];

Review Comment:
   ```suggestion
                       byte[] type = new byte[buffer.capacity()];
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);

Review Comment:
   ```suggestion
                       String typeDefinition = new String(bytes_Y, StandardCharsets.UTF_8);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);

Review Comment:
   ```suggestion
                   int updateRelationId = buffer.getInt(position);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());
+    }
+
+    /**
+     * Indicates whether the dataTypes field is empty.
+     *
+     * @return boolean
+     */
+    public boolean isDataTypesEmpty() {

Review Comment:
   Public methods should be declared before private methods.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Reader.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.postgresql.PGConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+
+/**
+ * Reader is the helper class for PostgreSQL Replication Stream, witch allow to
+ * read event changes (INSERT, UPDATE, DELENTE, etc.) from a specific
+ * Publication via a Replication Slot in real-time. It is possible to indicate a
+ * position to start (LSN) reading at the object creation.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Slot
+ * @see org.apache.nifi.cdc.postgresql.event.Decoder
+ */
+public class Reader {
+    private boolean includeBeginCommit;
+    private boolean includeAllMetadata;
+    private String publication;
+
+    private Slot replicationSlot;
+    private PGReplicationStream replicationStream;
+    private Decoder replicationDecoder;
+
+    public Reader(String slot, boolean dropSlotIfExists, String publication, Long lsn, boolean includeBeginCommit,
+            boolean includeAllMetadata, Connection replicationConn, Connection queryConn)
+            throws SQLException {
+
+        try {
+            // Creating replication slot.
+            if (this.replicationSlot == null) {
+                this.replicationSlot = new Slot(slot, dropSlotIfExists, replicationConn, queryConn);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to create replication slot", e);
+        }
+
+        try {
+            // Creating replication stream.
+            this.publication = publication;
+            PGConnection pgConn = replicationConn.unwrap(PGConnection.class);
+            if (lsn == null) {
+                this.replicationStream = pgConn.getReplicationAPI().replicationStream().logical()
+                        .withSlotName(this.replicationSlot.getName()).withSlotOption("proto_version", "1")
+                        .withSlotOption("publication_names", this.publication).withStatusInterval(1, TimeUnit.SECONDS)
+                        .start();
+            } else {
+                // Reading from LSN start position.
+                LogSequenceNumber startLSN = LogSequenceNumber.valueOf(lsn);
+                this.replicationStream = pgConn.getReplicationAPI().replicationStream().logical()
+                        .withSlotName(this.replicationSlot.getName()).withSlotOption("proto_version", "1")
+                        .withSlotOption("publication_names", this.publication).withStatusInterval(1, TimeUnit.SECONDS)
+                        .withStartPosition(startLSN).start();
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to create replication stream", e);
+        }
+
+        // Creating replication buffer decoder.
+        this.replicationDecoder = new Decoder();
+        try {
+            // Loading data types.
+            if (replicationDecoder.isDataTypesEmpty()) {
+                replicationDecoder.loadDataTypes(queryConn);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to load data types", e);
+        }
+
+        this.includeBeginCommit = includeBeginCommit;
+        this.includeAllMetadata = includeAllMetadata;
+    }
+
+    /**
+     * Constructor only for unit tests.
+     *
+     * @see org.apache.nifi.cdc.postgresql.event.MockReader
+     */
+    public Reader(Long lsn, boolean includeBeginCommit, boolean includeAllMetadata) {
+    }
+
+    /**
+     * Reads a pending event from the slot replication since the last
+     * feedback. If received buffer is null, there is not more pending events. The
+     * binary event buffer is decoded and converted to a JSON String.
+     *
+     * @return HashMap (String, Object)
+     * @throws SQLException
+     *                      if read event failed
+     * @throws IOException
+     *                      if decode event failed
+     *
+     * @see org.apache.nifi.cdc.postgresql.event.Slot
+     * @see org.apache.nifi.cdc.postgresql.event.Decoder
+     */
+    public HashMap<String, Object> readMessage() throws SQLException, IOException {
+        ByteBuffer buffer;
+        try {
+            buffer = this.replicationStream.readPending();
+        } catch (Exception e) {
+            throw new SQLException("Failed to read pending events", e);
+        }
+
+        if (buffer == null) {
+            return null;
+        }
+
+        // Binary buffer decode process.
+        HashMap<String, Object> message;
+        try {
+            message = replicationDecoder.decodeLogicalReplicationBuffer(buffer, this.includeBeginCommit,
+                    this.includeAllMetadata);
+        } catch (Exception e) {
+            throw new IOException("Failed to decode event buffer", e);
+        }
+
+        // Messages not requested/included.
+        if (message.isEmpty()) {
+            return message;
+        }
+
+        LogSequenceNumber lsn = this.replicationStream.getLastReceiveLSN();
+        Long lsnLong = lsn.asLong();
+
+        // Some messagens don't have LSN (e.g. Relation).
+        if (lsnLong > 0) {
+            message.put("lsn", lsnLong);
+        }
+
+        return message;
+    }
+
+    /**
+     * Converts event message to JSON String.
+     *
+     * @param message
+     *                The key-value message.
+     * @return String
+     * @throws IOException
+     *                     if convert message failed
+     */
+    public String convertMessageToJSON(HashMap<String, Object> message) throws IOException {
+        ObjectMapper jsonMapper = new ObjectMapper();
+        try {
+            return jsonMapper.writeValueAsString(message);
+        } catch (JsonProcessingException e) {
+            throw new IOException("Failed to convert event message to JSON", e);
+        }
+    }

Review Comment:
   Recommend moving this utility method to another class since it is specific to JSON conversion.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());

Review Comment:
   This should be replaced with DateTimeFormatter.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());

Review Comment:
   The current ProcessSession implementation does not actually transfer FlowFiles until the Session is committed. It looks like this should be adjusted to use ProcessSession.commitAsync() with a callback to send feedback and update the state after a successful commit.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);

Review Comment:
   Rather than hard-coding this implementation to JSON, this would be a good opportunity to introduce a RecordWriter Service property. That would support writing messages as JSON, or any number of other supported Record formats. Using a record-oriented approach would also make the flow design much more efficient, allowing multiple messages to be packed into a single FlowFile.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }

Review Comment:
   This null check and call to `setup()` does not appear to be thread safe. Did you consider annotating the `setup()` method with `OnScheduled` instead of this approach?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());
+
+        } catch (SQLException | IOException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    // Update last LSN received on processor state.
+    private void updateState(StateManager stateManager) throws IOException {
+        if (stateManager != null) {
+            Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
+
+            newStateMap.put(LAST_LSN_STATE_MAP_KEY, String.valueOf(this.lastLSN));
+            stateManager.setState(newStateMap, Scope.CLUSTER);
+        }
+    }
+
+    protected void stop(StateManager stateManager) throws CDCException {
+        try {
+            this.replicationReader = null;
+
+            if (lastLSN != null) {
+                updateState(stateManager);
+            }
+
+            if (queryConnHolder != null) {
+                queryConnHolder.close();
+            }
+
+            if (replicationConnHolder != null) {
+                replicationConnHolder.close();
+            }
+        } catch (Exception e) {
+            throw new CDCException("Closing CDC Connection failed", e);
+        }
+    }
+
+    @OnStopped
+    public void onStopped(ProcessContext context) {
+        try {
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown(ProcessContext context) {
+        try {
+            // In case we get shutdown while still running, save off the current state,
+            // disconnect, and shut down gracefully.
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    protected void createReplicationReader(String slot, boolean dropSlotIfExists, String publication, Long lsn,
+            boolean includeBeginCommit, boolean includeAllMetadata, Connection replicationConn,
+            Connection queryConn) throws SQLException {
+        this.replicationReader = new Reader(slot, dropSlotIfExists, publication, lsn, includeBeginCommit,
+                includeAllMetadata, replicationConn, queryConn);
+    }
+
+    protected Long getMaxFlowFileListSize() {
+        return this.maxFlowFileListSize;
+    }
+
+    protected void connect(String host, String port, String database, String username, String password,
+            String driverLocation, String driverName, long connectionTimeout)
+            throws IOException, TimeoutException {
+        try {
+            // Ensure driverLocation and driverName are correct
+            // before establishing connection.
+            registerDriver(driverLocation, driverName);
+        } catch (InitializationException e) {
+            throw new RuntimeException(
+                    "Failed to register JDBC driver. Ensure PostgreSQL Driver Location(s) and PostgreSQL Driver Class Name "
+                            + "are configured correctly",
+                    e);
+        }
+
+        // Connection expects a non-null password.
+        if (password == null) {
+            password = "";
+        }
+
+        // Connection expects a timeout.
+        if (connectionTimeout == 0) {
+            connectionTimeout = Long.MAX_VALUE;
+        }
+
+        InetSocketAddress address = getAddress(host, port);
+
+        queryConnHolder = new JDBCConnectionHolder(address, database, username, password, false, connectionTimeout);
+        try {
+            // Ensure Query connection can be created.
+            this.getQueryConnection();
+        } catch (SQLException e) {
+            throw new IOException("Failed to create SQL connection to specified host and port", e);
+        }
+
+        replicationConnHolder = new JDBCConnectionHolder(address, database, username, password, true,
+                connectionTimeout);
+        try {
+            // Ensure Replication connection can be created.
+            this.getReplicationConnection();
+        } catch (SQLException e) {
+            throw new IOException("Failed to create Replication connection to specified host and port", e);
+        }
+    }
+
+    protected Connection getQueryConnection() throws SQLException {
+        return queryConnHolder.getConnection();
+    }
+
+    protected Connection getReplicationConnection() throws SQLException {
+        return replicationConnHolder.getConnection();
+    }
+
+    protected void closeQueryConnection() {
+        if (queryConnHolder != null)
+            queryConnHolder.close();
+    }
+
+    protected void closeReplicationConnection() {
+        if (replicationConnHolder != null)
+            replicationConnHolder.close();
+    }
+
+    /**
+     * Ensures that you are using the ClassLoader for you NAR.
+     *
+     * @param driverLocation
+     *                       Driver's location.
+     *
+     * @param driverName
+     *                       Driver's class name.
+     *
+     * @throws InitializationException
+     *                                 if there is a problem obtaining the
+     *                                 ClassLoader
+     **/
+    protected void registerDriver(String driverLocation, String driverName) throws InitializationException {
+        if (driverLocation != null && driverLocation.length() > 0) {
+            try {
+                // Split and trim the entries
+                final ClassLoader classLoader = ClassLoaderUtils.getCustomClassLoader(driverLocation,
+                        this.getClass().getClassLoader(), (dir, name) -> name != null && name.endsWith(".jar"));
+
+                final Class<?> clazz = Class.forName(driverName, true, classLoader);
+                if (clazz == null) {
+                    throw new InitializationException("Can't load Database Driver " + driverName);
+                }
+                final Driver driver = (Driver) clazz.getDeclaredConstructor().newInstance();
+                DriverManager.registerDriver(new DriverShim(driver));
+
+            } catch (final InitializationException e) {
+                throw e;
+            } catch (final MalformedURLException e) {
+                throw new InitializationException("Invalid Database Driver Jar Url", e);
+            } catch (final Exception e) {
+                throw new InitializationException("Can't load Database Driver", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the Address from HOST and PORT properties.
+     *
+     * @param host
+     *             Hostname of PostgreSQL Cluster.
+     * @param port
+     *             Port that PostgreSQL Cluster is listening on.
+     *
+     * @return InetSocketAddresses
+     */
+    protected InetSocketAddress getAddress(String host, String port) {
+        if (host == null || host.trim().isEmpty()) {
+            return null;
+        }
+
+        if (port == null || port.trim().isEmpty()) {
+            port = POSTGRESQL_PORT_DEFAULT;
+        }
+
+        return new InetSocketAddress(host.trim(), Integer.parseInt(port.trim()));
+    }
+
+    private class JDBCConnectionHolder {
+        private final String connectionUrl;
+        private final Properties connectionProps = new Properties();
+        private final long connectionTimeoutMillis;
+
+        private Connection connection;
+        private boolean isReplicationConnection = false;
+
+        private JDBCConnectionHolder(InetSocketAddress address, String database, String username, String password,
+                boolean isReplicationConnection, long connectionTimeoutMillis) {
+            this.connectionUrl = "jdbc:postgresql://" + address.getHostString() + ":" + address.getPort() + "/"
+                    + database;
+            this.connectionTimeoutMillis = connectionTimeoutMillis;
+
+            if (isReplicationConnection) {
+                this.isReplicationConnection = isReplicationConnection;
+                this.connectionProps.put("assumeMinServerVersion", POSTGRESQL_MIN_VERSION);
+                this.connectionProps.put("replication", "database");
+                this.connectionProps.put("preferQueryMode", "simple");
+            }
+
+            this.connectionProps.put("user", username);
+            this.connectionProps.put("password", password);
+        }
+
+        private Connection getConnection() throws SQLException {
+            if (connection != null && connection.isValid((int) (connectionTimeoutMillis / 1000))) {
+                getLogger().trace("Returning the pooled JDBC connection.");
+                return connection;
+            }
+
+            // Close the existing connection just in case.
+            close();
+
+            getLogger().trace("Creating a new JDBC connection.");
+            connection = DriverManager.getConnection(connectionUrl, connectionProps);
+
+            // Set auto commit for query connection.
+            if (!isReplicationConnection) {
+                connection.setAutoCommit(true);
+            }
+
+            return connection;
+        }
+
+        private void close() {
+            if (connection != null) {
+                try {
+                    getLogger().trace("Closing the pooled JDBC connection.");
+                    connection.close();
+                } catch (SQLException e) {
+                    getLogger().warn("Failed to close JDBC connection due to " + e, e);
+                }
+            }
+        }
+    }
+
+    private static class DriverShim implements Driver {

Review Comment:
   Is there a particular reason for using this DriverShim wrapper?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());
+
+        } catch (SQLException | IOException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    // Update last LSN received on processor state.
+    private void updateState(StateManager stateManager) throws IOException {
+        if (stateManager != null) {
+            Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
+
+            newStateMap.put(LAST_LSN_STATE_MAP_KEY, String.valueOf(this.lastLSN));
+            stateManager.setState(newStateMap, Scope.CLUSTER);
+        }
+    }
+
+    protected void stop(StateManager stateManager) throws CDCException {
+        try {
+            this.replicationReader = null;
+
+            if (lastLSN != null) {
+                updateState(stateManager);
+            }
+
+            if (queryConnHolder != null) {
+                queryConnHolder.close();
+            }
+
+            if (replicationConnHolder != null) {
+                replicationConnHolder.close();
+            }
+        } catch (Exception e) {
+            throw new CDCException("Closing CDC Connection failed", e);
+        }
+    }
+
+    @OnStopped
+    public void onStopped(ProcessContext context) {
+        try {
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown(ProcessContext context) {
+        try {
+            // In case we get shutdown while still running, save off the current state,
+            // disconnect, and shut down gracefully.
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }

Review Comment:
   Is there a reason for this method in addition to `OnStopped`? As indicated in the documentation, it is not guaranteed that this method will be called, and in general `OnStopped` should be sufficient since it is doing the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mqofori commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "mqofori (via GitHub)" <gi...@apache.org>.
mqofori commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1559668799

   > Hello @martinson, this is happening because of the logical process of logical replication on postgresql... If you see on DEBEZIUM Kafka connect, have same problem... The workaround is create a table with only one row and send a update on row changing datetime for example... This is happening generally on PostgreSQL RDS... Is this your case ? You can send that update in the minute-by-minute solution table, for example.
   > […](#)
   > -- *Gerdan Rezende dos Santos*∴ Cloudera, Hortonworks, PostgreSQL - Support, Training & Services +55 (61) 996 451 525
   > On Fri, May 19, 2023 at 12:51 PM Martinson Ofori ***@***.***> wrote: @davyam <https://github.com/davyam> thanks for all your work on this. I am using the NAR that @janis-ax <https://github.com/janis-ax> helped build and I keep losing connection to my server. This usually happens after the processor has been running for a while with no active changes. When I stop, re-enter the DB server password, and start the server then the error goes away. Do you know what could be going on or how I can resolve this? Also, should we expect a stable official version any time soon? [image: image] <https://user-images.githubusercontent.com/48808182/239577713-74cd0165-bb7d-4269-b80b-284d1759f1cc.png> — Reply to this email directly, view it on GitHub <[#6053 (comment)](https://github.com/apache/nifi/pull/6053#issuecomment-1554780541)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AD5RGGZ4OOHMAWKZIM4JVZ3XG6JIZANCNFSM5WGR6OMA> . You are receiving this because you commented.Me
 ssage ID: ***@***.***>
   
   Thank you @gerdansantos , tried this yesterday and had a few issues but couldn't spend more time due to some other production issues I have. In principle, I think this solution should work. I will test and feedback with some more concrete result sometime early tomorrow. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mqofori commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "mqofori (via GitHub)" <gi...@apache.org>.
mqofori commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1559705445

   > Thanks for updating and preparing the new PR @davyam. Other reviewers are probably in a better position to evaluate some of the functionality, but I provided a few comments on style and functionality.
   > 
   > At a high-level, on particular question concerns the use of JSON as the output format, and one message per FlowFile. For high-volume processing, the ability to combine multiple records in a single FlowFile allows for much higher throughput. As mentioned in the detailed comments, adding support for a RecordWriter service would also make the output much more flexible, supporting JSON as well as other types.
   
   By the way, I agree with this. After testing this for some time, enhancements can definitely be made to improve the performance and scalability of this processor. The current one-record-per-FlowFile approach is very slow for bulk operations; say delete/update say 100K+ records. Some sort of batching strategy here could be particularly useful for high-volume database operations, helping to maximize throughput and address potential performance issues as @exceptionfactory has mentioned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1130131396

   @exceptionfactory thanks for review!
   
   I'll check all suggestions and apply the necessary adjustments as soon as possible.
   
   About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a new processor's version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] janis-ax commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "janis-ax (via GitHub)" <gi...@apache.org>.
janis-ax commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1399623781

   > Hi @davyam: We need Nifi CapctureChangePostgreSQL processor for one of our requirements. I have gone by above comments, I understand you are busy in other activities as well. But can you please share any tentative date by when we can get a stable processor of CapctureChangePostgreSQL
   
   @shilpaprotiviti if the processor is so urgent for you: I build and uploaded the NAR directly [here](https://github.com/apache/nifi/files/10325786/nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip). So, you can download and use the processor. :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mqofori commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "mqofori (via GitHub)" <gi...@apache.org>.
mqofori commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1554780541

   @davyam thanks for all your work on this. 
   
   I am using the NAR that @janis-ax helped build and I keep losing connection to my server. This usually happens after the processor has been running for a while with no active changes. When I stop, re-enter the DB server password, and start the server then the error goes away. 
   
   Do you know what could be going on or how I can resolve this? Also, should we expect a stable official version any time soon?
   
   ![image](https://github.com/apache/nifi/assets/48808182/74cd0165-bb7d-4269-b80b-284d1759f1cc)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] gerdansantos commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "gerdansantos (via GitHub)" <gi...@apache.org>.
gerdansantos commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1557327454

   Hello @Martinson,
   this is happening because of the logical process of logical replication on
   postgresql... If you see on DEBEZIUM Kafka connect, have same problem...
   The workaround is create a table with only one row and send a update on row
   changing datetime for example...
   This is happening generally on PostgreSQL RDS... Is this your case ?
   
   You can send that update in the minute-by-minute solution table, for
   example.
   
   --
   *Gerdan Rezende dos Santos*∴
   Cloudera, Hortonworks, PostgreSQL - Support, Training & Services
   +55 (61) 996 451 525
   
   
   On Fri, May 19, 2023 at 12:51 PM Martinson Ofori ***@***.***>
   wrote:
   
   > @davyam <https://github.com/davyam> thanks for all your work on this.
   >
   > I am using the NAR that @janis-ax <https://github.com/janis-ax> helped
   > build and I keep losing connection to my server. This usually happens after
   > the processor has been running for a while with no active changes. When I
   > stop, re-enter the DB server password, and start the server then the error
   > goes away.
   >
   > Do you know what could be going on or how I can resolve this? Also, should
   > we expect a stable official version any time soon?
   >
   > [image: image]
   > <https://user-images.githubusercontent.com/48808182/239577713-74cd0165-bb7d-4269-b80b-284d1759f1cc.png>
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/nifi/pull/6053#issuecomment-1554780541>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AD5RGGZ4OOHMAWKZIM4JVZ3XG6JIZANCNFSM5WGR6OMA>
   > .
   > You are receiving this because you commented.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6053:
URL: https://github.com/apache/nifi/pull/6053#discussion_r878042727


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/pom.xml:
##########
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-postgresql-bundle</artifactId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cdc-postgresql-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.3.3</version>
+        </dependency>

Review Comment:
   Reviewing this dependency, given that the processor has a property for the driver location, either this dependency should be marked as provided, or the property can be removed. It seems best to avoid including this library in the NAR to support more runtime flexibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] gerdansantos commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
gerdansantos commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1129919682

   Now,after two year's!!! We will go on? I believe!!!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1134998581

   > Okay, but how do you take an initial snapshot of database and immediately start the cdc process?
   
   There were several problems with the snapshot (delay, size, etc.). So, we decided to remove this option when the code was refactored. Our goal is to capture the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1133954910

   > I have a problem.
   
   Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] MaheshKungaria commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
MaheshKungaria commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1354348887

   Hi @davyam - We need Nifi CapctureChangePostgreSQL processor for one of our requirement and we checked version 1.19 doesn't have it. May I know by when we can get a stable processor of CapctureChangePostgreSQL 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1368052164

   Hey guys, 
   
   we are a little late, I know, other personal activities get in the way, but I'm still working on the improvements.
   
   ASAP we will have the CapctureChangePostgreSQL processor in NiFi.
   
   Best regards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "davyam (via GitHub)" <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1401239864

   > @shilpaprotiviti if the processor is so urgent for you: I build and uploaded the NAR directly [here](https://github.com/apache/nifi/files/10325786/nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip). So, you can download and use the processor. :)
   
   Thanks @janis-ax for your support!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1129440527

   @joewitt @mattyb149 @exceptionfactory 
   
   Hey guys, this is the new PR for NIFI-4239 (CaptureChangePostgreSQL processor). 
   
   Sorry for the mess in #5710 =/
   
   Please, run the workflow. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1130205527

   > About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.
   
   Thanks for the reply @davyam. Although record-oriented support may not be required in some cases, introducing this processor without support for a RecordWriter would lock in support for the JSON-specific output.
   
   Understanding that it requires some changes, and that a lot of work has already gone into this component, it is also important to consider community maintainability. Components designed for more narrow use cases are difficult to maintain over time, so that's why it can be more difficult to introduce new components. Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] MaheshKungaria commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
MaheshKungaria commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1269411291

   Thanks for the confirmation @davyam 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] davyam commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
davyam commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1130246347

   > Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.
   
   @exceptionfactory I understand. OK, I'll analyze the effort required to use the RecordWriter Service.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1560037836

   This pull request has generated a lot of helpful feedback and interest, but given the current fundamental issues with lack of event batching and some design concerns with event decoding, there are some significant areas to address.
   
   Additional discussion can be continued on the associated Jira issue [NIFI-4239](https://issues.apache.org/jira/browse/NIFI-4239). The Jira issue is a better place to continue the discussion until this pull request can be revisited or restarted.
   
   Given the lack of code changes in the last year, I am closing this pull request for now. This pull request can be reopened when ready, or a new pull request that addresses the current issues is another option.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] janis-ax commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
janis-ax commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1368036088

   Hey @MaheshKungaria if it's urgent for you: I build the processor, so you can try it out by putting the nar file into your lib folder. 
   
   [nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip](https://github.com/apache/nifi/files/10325786/nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on code in PR #6053:
URL: https://github.com/apache/nifi/pull/6053#discussion_r890241015


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/pom.xml:
##########
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-postgresql-bundle</artifactId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cdc-postgresql-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.3.3</version>
+        </dependency>

Review Comment:
   Agreed, I think we should keep the property and remove this JAR from the NAR so we can support maximum compatibility across past, present, and future versions of PostgreSQL without having to maintain/update the version of the JDBC driver across releases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] shilpaprotiviti commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by GitBox <gi...@apache.org>.
shilpaprotiviti commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1386586052

   We need Nifi CapctureChangePostgreSQL processor for one of our requirement. I have gone by above comments, I understand you are busy in other activities as well. But can you please share any tentative date by when we can get a stable processor of CapctureChangePostgreSQL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] janis-ax commented on pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "janis-ax (via GitHub)" <gi...@apache.org>.
janis-ax commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1399623545

   > Hi @davyam: We need Nifi CapctureChangePostgreSQL processor for one of our requirements. I have gone by above comments, I understand you are busy in other activities as well. But can you please share any tentative date by when we can get a stable processor of CapctureChangePostgreSQL
   
   @shilpaprotiviti if the processor is so urgent for you: I build and uploaded the NAR directly [here](https://github.com/apache/nifi/files/10325786/nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip). So, you can download and use the processor. :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory closed pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API
URL: https://github.com/apache/nifi/pull/6053


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API [nifi]

Posted by "neha-120 (via GitHub)" <gi...@apache.org>.
neha-120 commented on PR #6053:
URL: https://github.com/apache/nifi/pull/6053#issuecomment-1821309426

   Hi There,
   I am using the NAR that @janis-ax . But I have a requirement to use ssl authenticated postgres DB. Is there a way to configure ssl in this cdc postgres nar? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org