You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/18 13:30:47 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6053: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

exceptionfactory commented on code in PR #6053:
URL: https://github.com/apache/nifi/pull/6053#discussion_r875871477


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/pom.xml:
##########
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-postgresql-bundle</artifactId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-cdc-postgresql-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   These dependencies can be removed since there are included by default in the root Maven configuration.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);

Review Comment:
   ```suggestion
                   String relationDefinition = new String(bytes_R, StandardCharsets.UTF_8);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");

Review Comment:
   Use of SimpleDateFormat should be replaced with DateTimeFormatter. This particular calculation should be done once and stored as a static value.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/test/java/org/apache/nifi/cdc/postgresql/processors/TestCaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.postgresql.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.cdc.postgresql.event.MockReader;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;

Review Comment:
   JUnit 5 is now the preferred version, can you make the adjustments to use the JUnit 5 classes?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());
+    }
+
+    /**
+     * Indicates whether the dataTypes field is empty.
+     *
+     * @return boolean
+     */
+    public boolean isDataTypesEmpty() {
+        if (this.dataTypes.isEmpty()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Loads the PostgreSQL data types set from pg_catalog.pg_type system view.
+     * These set are used to enrich messages with the data type name.
+     *
+     * @param queryConnection
+     *                        Connection to PostgreSQL database.
+     * @throws SQLException
+     *                      if fails to access PostgreSQL database
+     */
+    public void loadDataTypes(Connection queryConnection) throws SQLException {
+        Statement stmt = queryConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        ResultSet rs = stmt.executeQuery("SELECT oid, typname FROM pg_catalog.pg_type");
+
+        while (rs.next()) {
+            this.dataTypes.put(rs.getInt(1), rs.getString(2));
+        }
+
+        rs.close();
+        stmt.close();
+    }
+
+    /**
+     * Sets manually the data types set used to enrich messages.
+     *
+     * @param dataTypes
+     *                  A set with data type id and data type name.
+     */
+    public void setDataTypes(HashMap<Integer, String> dataTypes) {
+        this.dataTypes = dataTypes;
+    }

Review Comment:
   These methods for loading data types should be moved to a new class. The `setDataTypes()` can remain here, but the retrieval of data types from a JDBC Connection can be moved to a separate class.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];

Review Comment:
   Use of the underscore character should be avoided in variable names.
   ```suggestion
                       byte[] origin = new byte[buffer.remaining()];
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);

Review Comment:
   ```suggestion
                   int insertId = buffer.getInt(position);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];

Review Comment:
   ```suggestion
                       byte[] type = new byte[buffer.capacity()];
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);

Review Comment:
   ```suggestion
                       String typeDefinition = new String(bytes_Y, StandardCharsets.UTF_8);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);

Review Comment:
   ```suggestion
                   int updateRelationId = buffer.getInt(position);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());
+    }
+
+    /**
+     * Indicates whether the dataTypes field is empty.
+     *
+     * @return boolean
+     */
+    public boolean isDataTypesEmpty() {

Review Comment:
   Public methods should be declared before private methods.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Reader.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.postgresql.PGConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+
+/**
+ * Reader is the helper class for PostgreSQL Replication Stream, witch allow to
+ * read event changes (INSERT, UPDATE, DELENTE, etc.) from a specific
+ * Publication via a Replication Slot in real-time. It is possible to indicate a
+ * position to start (LSN) reading at the object creation.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Slot
+ * @see org.apache.nifi.cdc.postgresql.event.Decoder
+ */
+public class Reader {
+    private boolean includeBeginCommit;
+    private boolean includeAllMetadata;
+    private String publication;
+
+    private Slot replicationSlot;
+    private PGReplicationStream replicationStream;
+    private Decoder replicationDecoder;
+
+    public Reader(String slot, boolean dropSlotIfExists, String publication, Long lsn, boolean includeBeginCommit,
+            boolean includeAllMetadata, Connection replicationConn, Connection queryConn)
+            throws SQLException {
+
+        try {
+            // Creating replication slot.
+            if (this.replicationSlot == null) {
+                this.replicationSlot = new Slot(slot, dropSlotIfExists, replicationConn, queryConn);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to create replication slot", e);
+        }
+
+        try {
+            // Creating replication stream.
+            this.publication = publication;
+            PGConnection pgConn = replicationConn.unwrap(PGConnection.class);
+            if (lsn == null) {
+                this.replicationStream = pgConn.getReplicationAPI().replicationStream().logical()
+                        .withSlotName(this.replicationSlot.getName()).withSlotOption("proto_version", "1")
+                        .withSlotOption("publication_names", this.publication).withStatusInterval(1, TimeUnit.SECONDS)
+                        .start();
+            } else {
+                // Reading from LSN start position.
+                LogSequenceNumber startLSN = LogSequenceNumber.valueOf(lsn);
+                this.replicationStream = pgConn.getReplicationAPI().replicationStream().logical()
+                        .withSlotName(this.replicationSlot.getName()).withSlotOption("proto_version", "1")
+                        .withSlotOption("publication_names", this.publication).withStatusInterval(1, TimeUnit.SECONDS)
+                        .withStartPosition(startLSN).start();
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to create replication stream", e);
+        }
+
+        // Creating replication buffer decoder.
+        this.replicationDecoder = new Decoder();
+        try {
+            // Loading data types.
+            if (replicationDecoder.isDataTypesEmpty()) {
+                replicationDecoder.loadDataTypes(queryConn);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Failed to load data types", e);
+        }
+
+        this.includeBeginCommit = includeBeginCommit;
+        this.includeAllMetadata = includeAllMetadata;
+    }
+
+    /**
+     * Constructor only for unit tests.
+     *
+     * @see org.apache.nifi.cdc.postgresql.event.MockReader
+     */
+    public Reader(Long lsn, boolean includeBeginCommit, boolean includeAllMetadata) {
+    }
+
+    /**
+     * Reads a pending event from the slot replication since the last
+     * feedback. If received buffer is null, there is not more pending events. The
+     * binary event buffer is decoded and converted to a JSON String.
+     *
+     * @return HashMap (String, Object)
+     * @throws SQLException
+     *                      if read event failed
+     * @throws IOException
+     *                      if decode event failed
+     *
+     * @see org.apache.nifi.cdc.postgresql.event.Slot
+     * @see org.apache.nifi.cdc.postgresql.event.Decoder
+     */
+    public HashMap<String, Object> readMessage() throws SQLException, IOException {
+        ByteBuffer buffer;
+        try {
+            buffer = this.replicationStream.readPending();
+        } catch (Exception e) {
+            throw new SQLException("Failed to read pending events", e);
+        }
+
+        if (buffer == null) {
+            return null;
+        }
+
+        // Binary buffer decode process.
+        HashMap<String, Object> message;
+        try {
+            message = replicationDecoder.decodeLogicalReplicationBuffer(buffer, this.includeBeginCommit,
+                    this.includeAllMetadata);
+        } catch (Exception e) {
+            throw new IOException("Failed to decode event buffer", e);
+        }
+
+        // Messages not requested/included.
+        if (message.isEmpty()) {
+            return message;
+        }
+
+        LogSequenceNumber lsn = this.replicationStream.getLastReceiveLSN();
+        Long lsnLong = lsn.asLong();
+
+        // Some messagens don't have LSN (e.g. Relation).
+        if (lsnLong > 0) {
+            message.put("lsn", lsnLong);
+        }
+
+        return message;
+    }
+
+    /**
+     * Converts event message to JSON String.
+     *
+     * @param message
+     *                The key-value message.
+     * @return String
+     * @throws IOException
+     *                     if convert message failed
+     */
+    public String convertMessageToJSON(HashMap<String, Object> message) throws IOException {
+        ObjectMapper jsonMapper = new ObjectMapper();
+        try {
+            return jsonMapper.writeValueAsString(message);
+        } catch (JsonProcessingException e) {
+            throw new IOException("Failed to convert event message to JSON", e);
+        }
+    }

Review Comment:
   Recommend moving this utility method to another class since it is specific to JSON conversion.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/event/Decoder.java:
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.event;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * Decode is the class responsible for convert a binary buffer event in a
+ * key-value message. The binary buffer represents a event change (BEGIN,
+ * COMMIT, INSERT, UPDATE, DELETE, etc.) in format that is defined by the
+ * decoding output plugin (pgoutput). Decode also uses dataType field to enrich
+ * RELATION messages including the name of column data type, and relations field
+ * to enrich INSERT, UPDATE and DELETE messages including the relation name.
+ *
+ * @see org.apache.nifi.cdc.postgresql.event.Table
+ * @see org.apache.nifi.cdc.postgresql.event.Column
+ */
+public class Decoder {
+
+    private HashMap<Integer, String> dataTypes = new HashMap<Integer, String>();
+    private HashMap<Integer, Table> relations = new HashMap<Integer, Table>();
+
+    /**
+     * Decodes the binary buffer event read from replication stream and returns a
+     * key-value message.
+     *
+     * @param buffer
+     *                           Event change encoded.
+     * @param includeBeginCommit
+     *                           If TRUE, BEGIN and COMMIT events are returned.
+     *                           If FALSE, message returned for these events is
+     *                           empty.
+     * @param includeAllMetadata
+     *                           If TRUE, all received metadata is included in the
+     *                           message and all events are returned.
+     *                           If FALSE, additional metadata (relation id, tuple
+     *                           type, etc.) are not included in the message. Also,
+     *                           message returned for RELATION, ORIGIN and TYPE
+     *                           events is empty.
+     * @return HashMap
+     * @throws ParseException
+     *                                      if decodes fails to PostgreSQL
+     *                                      Epoch Dates
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    public HashMap<String, Object> decodeLogicalReplicationBuffer(ByteBuffer buffer, boolean includeBeginCommit,
+            boolean includeAllMetadata) throws ParseException, UnsupportedEncodingException {
+
+        HashMap<String, Object> message = new HashMap<String, Object>();
+
+        /* (Byte1) Identifies the message type. */
+        char msgType = (char) buffer.get(0);
+        int position = 1;
+
+        switch (msgType) {
+            /* Identifies the message as a BEGIN message. */
+            case 'B':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "begin");
+
+                    /* (Int64) The final LSN of the transaction. */
+                    message.put("xLSNFinal", buffer.getLong(1));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime",
+                            getFormattedPostgreSQLEpochDate(buffer.getLong(9)));
+
+                    /* (Int32) Xid of the transaction. */
+                    message.put("xid", buffer.getInt(17));
+                }
+                return message;
+
+            /* Identifies the message as a COMMIT message. */
+            case 'C':
+
+                if (includeBeginCommit || includeAllMetadata) {
+                    message.put("type", "commit");
+
+                    /* (Byte1) Flags; currently unused (must be 0). */
+                    message.put("flags", buffer.get(1));
+
+                    /* (Int64) The LSN of the commit. */
+                    message.put("commitLSN", buffer.getLong(2));
+
+                    /* (Int64) The end LSN of the transaction. */
+                    message.put("xLSNEnd", buffer.getLong(10));
+
+                    /*
+                     * (Int64) Commit timestamp of the transaction. The value is in number of
+                     * microseconds since PostgreSQL epoch (2000-01-01).
+                     */
+                    message.put("xCommitTime", getFormattedPostgreSQLEpochDate(
+                            buffer.getLong(18)));
+                }
+                return message;
+
+            /* Identifies the message as an ORIGIN message. */
+            case 'O':
+
+                if (includeAllMetadata) {
+                    message.put("type", "origin");
+
+                    /* (Int64) The LSN of the commit on the origin server. */
+                    message.put("originLSN", buffer.getLong(1));
+
+                    buffer.position(9);
+                    byte[] bytes_O = new byte[buffer.remaining()];
+                    buffer.get(bytes_O);
+
+                    /* (String) Name of the origin. */
+                    message.put("originName", new String(bytes_O, StandardCharsets.UTF_8));
+                }
+                return message;
+
+            /* Identifies the message as a RELATION message. */
+            case 'R':
+
+                Table relation = new Table();
+
+                /* (Int32) ID of the relation. */
+                relation.setId(buffer.getInt(position));
+                position += 4;
+
+                buffer.position(0);
+                byte[] bytes_R = new byte[buffer.capacity()];
+                buffer.get(bytes_R);
+                String string_R = new String(bytes_R, StandardCharsets.UTF_8);
+
+                /* ASCII 0 = Null */
+                int firstStringEnd = string_R.indexOf(0, position);
+
+                /* ASCII 0 = Null */
+                int secondStringEnd = string_R.indexOf(0, firstStringEnd + 1);
+
+                /* (String) Namespace (empty string for pg_catalog). */
+                relation.setNamespace(string_R.substring(position, firstStringEnd));
+
+                /* (String) Relation name. */
+                relation.setName(string_R.substring(firstStringEnd + 1, secondStringEnd));
+
+                /* next position = current position + string length + 1 */
+                position += relation.getNamespace().length() + 1 + relation.getName().length() + 1;
+
+                buffer.position(position);
+
+                /*
+                 * (Byte1) Replica identity setting for the relation (same as relreplident in
+                 * pg_class).
+                 */
+                relation.setReplicaIdentity((char) buffer.get(position));
+                position += 1;
+
+                /* (Int16) Number of columns. */
+                relation.setNumColumns(buffer.getShort(position));
+                position += 2;
+
+                for (int i = 1; i <= relation.getNumColumns(); i++) {
+                    Column column = new Column();
+
+                    /* Position of column in the table. Index-based 1. */
+                    column.setPosition(i);
+
+                    /*
+                     * (Byte1) Flags for the column. Currently can be either 0 for no flags or 1
+                     * which marks the column as part of the key.
+                     */
+                    column.setIsKey(buffer.get(position));
+                    position += 1;
+
+                    /* (String) Name of the column. */
+                    column.setName(string_R.substring(position, string_R.indexOf(0, position)));
+                    position += column.getName().length() + 1;
+
+                    /* (Int32) ID of the column's data type. */
+                    column.setDataTypeId(buffer.getInt(position));
+                    position += 4;
+
+                    column.setDataTypeName(this.dataTypes.get(column.getDataTypeId()));
+
+                    /* (Int32) Type modifier of the column (atttypmod). */
+                    column.setTypeModifier(buffer.getInt(position));
+                    position += 4;
+
+                    relation.putColumn(i, column);
+                }
+
+                this.relations.put(relation.getId(), relation);
+
+                if (includeAllMetadata) {
+                    message.put("type", "relation");
+                    message.put("id", relation.getId());
+                    message.put("name", relation.getName());
+                    message.put("objectName", relation.getObjectName());
+                    message.put("replicaIdentity", relation.getReplicaIdentity());
+                    message.put("numColumns", relation.getNumColumns());
+                    message.put("columns", relation.getColumns());
+                }
+
+                return message;
+
+            /* Identifies the message as a TYPE message. */
+            case 'Y':
+
+                if (includeAllMetadata) {
+                    message.put("type", "type");
+
+                    /* (Int32) ID of the data type. */
+                    message.put("dataTypeId", buffer.getInt(position));
+                    position += 4;
+
+                    buffer.position(0);
+                    byte[] bytes_Y = new byte[buffer.capacity()];
+                    buffer.get(bytes_Y);
+                    String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
+
+                    /* (String) Namespace (empty string for pg_catalog). */
+                    message.put("namespaceName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                    position += ((String) message.get("namespaceName")).length() + 1;
+
+                    /* (String) Name of the data type. */
+                    message.put("dataTypeName", string_Y.substring(position, string_Y.indexOf(0, position)));
+                }
+                return message;
+
+            /* Identifies the message as an INSERT message. */
+            case 'I':
+
+                message.put("type", "insert");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_I = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_I);
+                }
+
+                message.put("relationName", this.relations.get(relationId_I).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Identifies the following TupleData message as a new tuple ('N').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(5));
+                }
+
+                position += 1;
+
+                message.put("tupleData", parseTupleData(relationId_I, buffer, position)[0]);
+
+                return message;
+
+            /* Identifies the message as an UPDATE message. */
+            case 'U':
+
+                message.put("type", "update");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_U = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_U);
+                }
+
+                message.put("relationName", this.relations.get(relationId_U).getObjectName());
+
+                /*
+                 * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                 * or as an old tuple ('O') or as a new tuple ('N').
+                 */
+                char tupleType1 = (char) buffer.get(position);
+                position += 1;
+
+                if (includeAllMetadata) {
+                    message.put("tupleType1", tupleType1);
+                }
+
+                /* TupleData N, K or O */
+                Object[] tupleData1 = parseTupleData(relationId_U, buffer, position);
+
+                if (includeAllMetadata) {
+                    message.put("tupleData1", tupleData1[0]);
+                }
+
+                if (tupleType1 == 'N') {
+                    if (!includeAllMetadata) {
+                        /* TupleData N */
+                        message.put("tupleData", tupleData1[0]);
+                    }
+
+                    return message;
+                }
+
+                position = (Integer) tupleData1[1];
+
+                if (includeAllMetadata) {
+                    char tupleType2 = (char) buffer.get(position);
+
+                    /*
+                     * Byte1) Either identifies the following TupleData submessage as a key ('K') or
+                     * as an old tuple ('O') or as a new tuple ('N').
+                     */
+                    message.put("tupleType2", tupleType2);
+                }
+
+                position += 1;
+
+                if (includeAllMetadata) {
+                    /* TupleData N */
+                    message.put("tupleData2", parseTupleData(relationId_U, buffer, position)[0]);
+                } else {
+                    /* TupleData N */
+                    message.put("tupleData", parseTupleData(relationId_U, buffer, position)[0]);
+                }
+
+                return message;
+
+            /* Identifies the message as a delete message. */
+            case 'D':
+
+                message.put("type", "delete");
+
+                /*
+                 * (Int32) ID of the relation corresponding to the ID in the relation message.
+                 */
+                int relationId_D = buffer.getInt(position);
+                position += 4;
+
+                if (includeAllMetadata) {
+                    message.put("relationId", relationId_D);
+                }
+
+                message.put("relationName", this.relations.get(relationId_D).getObjectName());
+
+                if (includeAllMetadata) {
+                    /*
+                     * (Byte1) Either identifies the following TupleData submessage as a key ('K')
+                     * or as an old tuple ('O').
+                     */
+                    message.put("tupleType", "" + (char) buffer.get(position));
+                }
+
+                position += 1;
+
+                /* TupleData */
+                message.put("tupleData", parseTupleData(relationId_D, buffer, position)[0]);
+
+                return message;
+
+            default:
+
+                message.put("type", "error");
+                message.put("description", "Unknown message type \"" + msgType + "\".");
+                return message;
+        }
+    }
+
+    /**
+     * Decodes the tuple data (row) included in binary buffer for INSERT,
+     * UPDATE and DELETE events.
+     *
+     * @param relationId
+     *                   Table ID used to get the table name from Decode relations
+     *                   field.
+     * @param buffer
+     *                   Binary buffer of the event.
+     * @param position
+     *                   Position in which to start tuple data.
+     * @return Object[]
+     * @throws UnsupportedEncodingException
+     *                                      if decodes fails to convert bytes
+     *                                      to String
+     */
+    private Object[] parseTupleData(int relationId, ByteBuffer buffer, int position)
+            throws UnsupportedEncodingException {
+
+        HashMap<String, Object> data = new HashMap<String, Object>();
+        Object[] result = { data, position };
+
+        /* (Int16) Number of columns. */
+        short columns = buffer.getShort(position);
+        /* short = 2 bytes */
+        position += 2;
+
+        for (int i = 1; i <= columns; i++) {
+            /*
+             * (Byte1) Either identifies the data as NULL value ('n') or unchanged TOASTed
+             * value ('u') or text formatted value ('t').
+             */
+            char dataFormat = (char) buffer.get(position);
+            /* byte = 1 byte */
+            position += 1;
+
+            Column column = relations.get(relationId).getColumn(i);
+
+            if (dataFormat == 't') {
+                /* (Int32) Length of the column value. */
+                int lenValue = buffer.getInt(position);
+                /* int = 4 bytes */
+                position += 4;
+
+                buffer.position(position);
+                byte[] bytes = new byte[lenValue];
+                buffer.get(bytes);
+                /* String = length * bytes */
+                position += lenValue;
+
+                /*
+                 * (ByteN) The value of the column, in text format.
+                 * Numeric types are not quoted.
+                 */
+                if (column.getDataTypeName() != null && column.getDataTypeName().startsWith("int")) {
+                    data.put(column.getName(), Long.parseLong(new String(bytes, StandardCharsets.UTF_8)));
+                } else {
+                    /* (ByteN) The value of the column, in text format. */
+                    data.put(column.getName(), new String(bytes, StandardCharsets.UTF_8));
+                }
+
+            } else { /* dataFormat = 'n' (NULL value) or 'u' (unchanged TOASTED value) */
+                if (dataFormat == 'n') {
+                    data.put(column.getName(), null);
+                } else {
+                    data.put(column.getName(), "UTOAST");
+                }
+            }
+        }
+
+        result[0] = data;
+        result[1] = position;
+
+        return result;
+    }
+
+    /**
+     * Convert PostgreSQL epoch to human-readable date format.
+     *
+     * @param microseconds
+     *                     Microseconds since 2000-01-01 00:00:00.000.
+     * @return String
+     * @throws ParseException
+     *                        if fails to parse start date
+     */
+    private String getFormattedPostgreSQLEpochDate(long microseconds) throws ParseException {
+        Date pgEpochDate = new SimpleDateFormat("yyyy-MM-dd").parse("2000-01-01");
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(pgEpochDate);
+        cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + (int) (microseconds / 1000000));
+
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z Z").format(cal.getTime());

Review Comment:
   This should be replaced with DateTimeFormatter.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());

Review Comment:
   The current ProcessSession implementation does not actually transfer FlowFiles until the Session is committed. It looks like this should be adjusted to use ProcessSession.commitAsync() with a callback to send feedback and update the state after a successful commit.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);

Review Comment:
   Rather than hard-coding this implementation to JSON, this would be a good opportunity to introduce a RecordWriter Service property. That would support writing messages as JSON, or any number of other supported Record formats. Using a record-oriented approach would also make the flow design much more efficient, allowing multiple messages to be packed into a single FlowFile.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }

Review Comment:
   This null check and call to `setup()` does not appear to be thread safe. Did you consider annotating the `setup()` method with `OnScheduled` instead of this approach?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());
+
+        } catch (SQLException | IOException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    // Update last LSN received on processor state.
+    private void updateState(StateManager stateManager) throws IOException {
+        if (stateManager != null) {
+            Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
+
+            newStateMap.put(LAST_LSN_STATE_MAP_KEY, String.valueOf(this.lastLSN));
+            stateManager.setState(newStateMap, Scope.CLUSTER);
+        }
+    }
+
+    protected void stop(StateManager stateManager) throws CDCException {
+        try {
+            this.replicationReader = null;
+
+            if (lastLSN != null) {
+                updateState(stateManager);
+            }
+
+            if (queryConnHolder != null) {
+                queryConnHolder.close();
+            }
+
+            if (replicationConnHolder != null) {
+                replicationConnHolder.close();
+            }
+        } catch (Exception e) {
+            throw new CDCException("Closing CDC Connection failed", e);
+        }
+    }
+
+    @OnStopped
+    public void onStopped(ProcessContext context) {
+        try {
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown(ProcessContext context) {
+        try {
+            // In case we get shutdown while still running, save off the current state,
+            // disconnect, and shut down gracefully.
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    protected void createReplicationReader(String slot, boolean dropSlotIfExists, String publication, Long lsn,
+            boolean includeBeginCommit, boolean includeAllMetadata, Connection replicationConn,
+            Connection queryConn) throws SQLException {
+        this.replicationReader = new Reader(slot, dropSlotIfExists, publication, lsn, includeBeginCommit,
+                includeAllMetadata, replicationConn, queryConn);
+    }
+
+    protected Long getMaxFlowFileListSize() {
+        return this.maxFlowFileListSize;
+    }
+
+    protected void connect(String host, String port, String database, String username, String password,
+            String driverLocation, String driverName, long connectionTimeout)
+            throws IOException, TimeoutException {
+        try {
+            // Ensure driverLocation and driverName are correct
+            // before establishing connection.
+            registerDriver(driverLocation, driverName);
+        } catch (InitializationException e) {
+            throw new RuntimeException(
+                    "Failed to register JDBC driver. Ensure PostgreSQL Driver Location(s) and PostgreSQL Driver Class Name "
+                            + "are configured correctly",
+                    e);
+        }
+
+        // Connection expects a non-null password.
+        if (password == null) {
+            password = "";
+        }
+
+        // Connection expects a timeout.
+        if (connectionTimeout == 0) {
+            connectionTimeout = Long.MAX_VALUE;
+        }
+
+        InetSocketAddress address = getAddress(host, port);
+
+        queryConnHolder = new JDBCConnectionHolder(address, database, username, password, false, connectionTimeout);
+        try {
+            // Ensure Query connection can be created.
+            this.getQueryConnection();
+        } catch (SQLException e) {
+            throw new IOException("Failed to create SQL connection to specified host and port", e);
+        }
+
+        replicationConnHolder = new JDBCConnectionHolder(address, database, username, password, true,
+                connectionTimeout);
+        try {
+            // Ensure Replication connection can be created.
+            this.getReplicationConnection();
+        } catch (SQLException e) {
+            throw new IOException("Failed to create Replication connection to specified host and port", e);
+        }
+    }
+
+    protected Connection getQueryConnection() throws SQLException {
+        return queryConnHolder.getConnection();
+    }
+
+    protected Connection getReplicationConnection() throws SQLException {
+        return replicationConnHolder.getConnection();
+    }
+
+    protected void closeQueryConnection() {
+        if (queryConnHolder != null)
+            queryConnHolder.close();
+    }
+
+    protected void closeReplicationConnection() {
+        if (replicationConnHolder != null)
+            replicationConnHolder.close();
+    }
+
+    /**
+     * Ensures that you are using the ClassLoader for you NAR.
+     *
+     * @param driverLocation
+     *                       Driver's location.
+     *
+     * @param driverName
+     *                       Driver's class name.
+     *
+     * @throws InitializationException
+     *                                 if there is a problem obtaining the
+     *                                 ClassLoader
+     **/
+    protected void registerDriver(String driverLocation, String driverName) throws InitializationException {
+        if (driverLocation != null && driverLocation.length() > 0) {
+            try {
+                // Split and trim the entries
+                final ClassLoader classLoader = ClassLoaderUtils.getCustomClassLoader(driverLocation,
+                        this.getClass().getClassLoader(), (dir, name) -> name != null && name.endsWith(".jar"));
+
+                final Class<?> clazz = Class.forName(driverName, true, classLoader);
+                if (clazz == null) {
+                    throw new InitializationException("Can't load Database Driver " + driverName);
+                }
+                final Driver driver = (Driver) clazz.getDeclaredConstructor().newInstance();
+                DriverManager.registerDriver(new DriverShim(driver));
+
+            } catch (final InitializationException e) {
+                throw e;
+            } catch (final MalformedURLException e) {
+                throw new InitializationException("Invalid Database Driver Jar Url", e);
+            } catch (final Exception e) {
+                throw new InitializationException("Can't load Database Driver", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the Address from HOST and PORT properties.
+     *
+     * @param host
+     *             Hostname of PostgreSQL Cluster.
+     * @param port
+     *             Port that PostgreSQL Cluster is listening on.
+     *
+     * @return InetSocketAddresses
+     */
+    protected InetSocketAddress getAddress(String host, String port) {
+        if (host == null || host.trim().isEmpty()) {
+            return null;
+        }
+
+        if (port == null || port.trim().isEmpty()) {
+            port = POSTGRESQL_PORT_DEFAULT;
+        }
+
+        return new InetSocketAddress(host.trim(), Integer.parseInt(port.trim()));
+    }
+
+    private class JDBCConnectionHolder {
+        private final String connectionUrl;
+        private final Properties connectionProps = new Properties();
+        private final long connectionTimeoutMillis;
+
+        private Connection connection;
+        private boolean isReplicationConnection = false;
+
+        private JDBCConnectionHolder(InetSocketAddress address, String database, String username, String password,
+                boolean isReplicationConnection, long connectionTimeoutMillis) {
+            this.connectionUrl = "jdbc:postgresql://" + address.getHostString() + ":" + address.getPort() + "/"
+                    + database;
+            this.connectionTimeoutMillis = connectionTimeoutMillis;
+
+            if (isReplicationConnection) {
+                this.isReplicationConnection = isReplicationConnection;
+                this.connectionProps.put("assumeMinServerVersion", POSTGRESQL_MIN_VERSION);
+                this.connectionProps.put("replication", "database");
+                this.connectionProps.put("preferQueryMode", "simple");
+            }
+
+            this.connectionProps.put("user", username);
+            this.connectionProps.put("password", password);
+        }
+
+        private Connection getConnection() throws SQLException {
+            if (connection != null && connection.isValid((int) (connectionTimeoutMillis / 1000))) {
+                getLogger().trace("Returning the pooled JDBC connection.");
+                return connection;
+            }
+
+            // Close the existing connection just in case.
+            close();
+
+            getLogger().trace("Creating a new JDBC connection.");
+            connection = DriverManager.getConnection(connectionUrl, connectionProps);
+
+            // Set auto commit for query connection.
+            if (!isReplicationConnection) {
+                connection.setAutoCommit(true);
+            }
+
+            return connection;
+        }
+
+        private void close() {
+            if (connection != null) {
+                try {
+                    getLogger().trace("Closing the pooled JDBC connection.");
+                    connection.close();
+                } catch (SQLException e) {
+                    getLogger().warn("Failed to close JDBC connection due to " + e, e);
+                }
+            }
+        }
+    }
+
+    private static class DriverShim implements Driver {

Review Comment:
   Is there a particular reason for using this DriverShim wrapper?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cdc.postgresql.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.postgresql.event.Reader;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+/**
+ * A processor to Capture Data Change (CDC) events from a PostgreSQL database
+ * and return them as flow files.
+ */
+@TriggerSerially
+@PrimaryNodeOnly
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+
+@Tags({ "sql", "jdbc", "cdc", "postgresql" })
+
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. "
+        + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. "
+        + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, "
+        + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. "
+        + "This processor also uses SQL Connection to query system views. In addition, a Publication in PostgreSQL database should already exist.")
+
+@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, "
+        + "such that it can continue from the same location if restarted.")
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."),
+        @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order "
+                + "of the CDC event flow file relative to the other event flow files."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") })
+
+public class CaptureChangePostgreSQL extends AbstractProcessor {
+    // Constants
+    private static final String MIME_TYPE_DEFAULT = "application/json";
+    private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn";
+    private static final String POSTGRESQL_MIN_VERSION = "10";
+    private static final String POSTGRESQL_PORT_DEFAULT = "5432";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successfully created FlowFile from CDC event.").build();
+
+    private Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-class")
+            .displayName("PostgreSQL Driver Class Name")
+            .description("The class name of the PostgreSQL database driver class")
+            .defaultValue("org.postgresql.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-driver-locations")
+            .displayName("PostgreSQL Driver Location(s)")
+            .description(
+                    "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/usr/share/java/postgresql-42.3.1.jar'")
+            .defaultValue(null)
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-host")
+            .displayName("PostgreSQL Hostname")
+            .description("The hostname for PostgreSQL connections.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-port")
+            .displayName("PostgreSQL Port")
+            .description("The default port for PostgreSQL connections.")
+            .required(true)
+            .defaultValue(POSTGRESQL_PORT_DEFAULT)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-database")
+            .displayName("Database")
+            .description("Specifies the name of the database to connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-user")
+            .displayName("Username")
+            .description("Username to access the PostgreSQL database. Must be a superuser.")
+            .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-password")
+            .displayName("Password")
+            .description("Password to access the PostgreSQL database.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time")
+            .description(
+                    "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-publication")
+            .displayName("Publication Name")
+            .description(
+                    "A group of tables whose data changes are intended to be replicated through Logical Replication. "
+                            + "It should be created in the database before the processor starts.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-slot-name")
+            .displayName("Replication Slot Name")
+            .description(
+                    "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. "
+                            + "If it already exists, make sure that pgoutput is the output plugin.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-drop-slot-if-exists")
+            .displayName("Drop if exists replication slot?")
+            .description(
+                    "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-begin-commit")
+            .displayName("Include BEGIN and COMMIT statements?")
+            .description(
+                    "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, "
+                            + "otherwise set to false, which suppresses these statements and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-include-all-metadata")
+            .displayName("Include all metadata?")
+            .description(
+                    "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). "
+                            + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder()
+            .name("cdc-postgresql-start-lsn")
+            .displayName("Start Log Sequence Number (LSN)")
+            .description(
+                    "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current "
+                            + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
+
+    // Key Attributes
+    private List<PropertyDescriptor> descriptors;
+    private JDBCConnectionHolder queryConnHolder = null;
+    private JDBCConnectionHolder replicationConnHolder = null;
+    private volatile Long lastLSN = null;
+    protected Reader replicationReader = null;
+    protected Long maxFlowFileListSize = 100L;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(DRIVER_NAME);
+        descriptors.add(DRIVER_LOCATION);
+        descriptors.add(HOST);
+        descriptors.add(PORT);
+        descriptors.add(DATABASE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECTION_TIMEOUT);
+        descriptors.add(PUBLICATION);
+        descriptors.add(REPLICATION_SLOT);
+        descriptors.add(DROP_SLOT_IF_EXISTS);
+        descriptors.add(INCLUDE_BEGIN_COMMIT);
+        descriptors.add(INCLUDE_ALL_METADATA);
+        descriptors.add(START_LSN);
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    // Runs the initial configuration of processor.
+    public void setup(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error(
+                    "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.",
+                    ioe);
+            context.yield();
+            return;
+        }
+
+        final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+        final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+
+        final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions()
+                .asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue();
+        final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue();
+        final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean();
+        final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean();
+        final Long startLSN = context.getProperty(START_LSN).asLong();
+
+        if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) {
+            this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY));
+        } else {
+            if (startLSN != null) {
+                this.lastLSN = startLSN;
+            }
+        }
+
+        try {
+            this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout);
+            this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit,
+                    includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection());
+
+        } catch (final IOException | TimeoutException | SQLException | RuntimeException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
+            setup(context);
+        }
+
+        try {
+            List<FlowFile> listFlowFiles = new ArrayList<>();
+
+            while (listFlowFiles.size() < this.getMaxFlowFileListSize()) {
+                HashMap<String, Object> message = this.replicationReader.readMessage();
+
+                if (message == null) { // No more messages.
+                    break;
+                }
+
+                if (message.isEmpty()) { // Skip empty messages.
+                    continue;
+                }
+
+                String data = this.replicationReader.convertMessageToJSON(message);
+
+                FlowFile flowFile = null;
+                flowFile = session.create();
+
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        out.write(data.getBytes(StandardCharsets.UTF_8));
+                    }
+                });
+
+                flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString());
+
+                // Some messagens don't have LSN (e.g. Relation).
+                if (message.containsKey("lsn")) {
+                    flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString());
+                }
+
+                flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT);
+
+                listFlowFiles.add(flowFile);
+            }
+
+            session.transfer(listFlowFiles, REL_SUCCESS);
+
+            this.lastLSN = this.replicationReader.getLastReceiveLSN();
+
+            // Feedback is sent after the flowfiles transfer completes.
+            this.replicationReader.sendFeedback(this.lastLSN);
+
+            updateState(context.getStateManager());
+
+        } catch (SQLException | IOException e) {
+            context.yield();
+            this.closeQueryConnection();
+            this.closeReplicationConnection();
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+    // Update last LSN received on processor state.
+    private void updateState(StateManager stateManager) throws IOException {
+        if (stateManager != null) {
+            Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
+
+            newStateMap.put(LAST_LSN_STATE_MAP_KEY, String.valueOf(this.lastLSN));
+            stateManager.setState(newStateMap, Scope.CLUSTER);
+        }
+    }
+
+    protected void stop(StateManager stateManager) throws CDCException {
+        try {
+            this.replicationReader = null;
+
+            if (lastLSN != null) {
+                updateState(stateManager);
+            }
+
+            if (queryConnHolder != null) {
+                queryConnHolder.close();
+            }
+
+            if (replicationConnHolder != null) {
+                replicationConnHolder.close();
+            }
+        } catch (Exception e) {
+            throw new CDCException("Closing CDC Connection failed", e);
+        }
+    }
+
+    @OnStopped
+    public void onStopped(ProcessContext context) {
+        try {
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown(ProcessContext context) {
+        try {
+            // In case we get shutdown while still running, save off the current state,
+            // disconnect, and shut down gracefully.
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }

Review Comment:
   Is there a reason for this method in addition to `OnStopped`? As indicated in the documentation, it is not guaranteed that this method will be called, and in general `OnStopped` should be sufficient since it is doing the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org