You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/11 19:53:44 UTC

[nifi] branch main updated: NIFI-10442: Create PutIceberg processor

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e87bced147 NIFI-10442: Create PutIceberg processor
e87bced147 is described below

commit e87bced14775e19208617ad48c4ec20b98f8699f
Author: Mark Bathori <ba...@gmail.com>
AuthorDate: Tue Sep 6 10:06:17 2022 +0200

    NIFI-10442: Create PutIceberg processor
    
    This closes #6368.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 nifi-assembly/pom.xml                              |  26 +
 .../serialization/record/util/DataTypeUtils.java   |   6 +-
 .../nifi-iceberg-processors-nar/pom.xml            |  49 ++
 .../src/main/resources/META-INF/LICENSE            | 222 +++++++
 .../src/main/resources/META-INF/NOTICE             | 505 ++++++++++++++
 .../nifi-iceberg-processors/pom.xml                | 175 +++++
 .../iceberg/AbstractIcebergProcessor.java          | 125 ++++
 .../apache/nifi/processors/iceberg/PutIceberg.java | 260 ++++++++
 .../iceberg/converter/ArrayElementGetter.java      | 115 ++++
 .../iceberg/converter/DataConverter.java           |  24 +
 .../iceberg/converter/GenericDataConverters.java   | 230 +++++++
 .../iceberg/converter/IcebergRecordConverter.java  | 190 ++++++
 .../iceberg/converter/RecordFieldGetter.java       | 122 ++++
 .../iceberg/writer/IcebergPartitionedWriter.java   |  50 ++
 .../iceberg/writer/IcebergTaskWriterFactory.java   |  66 ++
 .../services/org.apache.nifi.processor.Processor   |  16 +
 .../nifi/processors/iceberg/TestFileAbort.java     | 108 +++
 .../iceberg/TestIcebergRecordConverter.java        | 706 ++++++++++++++++++++
 .../iceberg/TestPutIcebergWithHadoopCatalog.java   | 206 ++++++
 .../iceberg/TestPutIcebergWithHiveCatalog.java     | 276 ++++++++
 .../iceberg/catalog/TestHadoopCatalogService.java  |  51 ++
 .../iceberg/catalog/TestHiveCatalogService.java    |  58 ++
 .../iceberg/metastore/MetastoreCore.java           | 184 +++++
 .../processors/iceberg/metastore/ScriptRunner.java |  76 +++
 .../iceberg/metastore/ThriftMetastore.java         |  50 ++
 .../processors/iceberg/util/IcebergTestUtils.java  | 134 ++++
 .../src/test/resources/date.avsc                   |  44 ++
 .../src/test/resources/hive-schema-3.2.0.derby.sql | 738 +++++++++++++++++++++
 .../src/test/resources/user.avsc                   |  26 +
 .../nifi-iceberg-services-api-nar/pom.xml          |  43 ++
 .../src/main/resources/META-INF/LICENSE            | 209 ++++++
 .../src/main/resources/META-INF/NOTICE             | 501 ++++++++++++++
 .../nifi-iceberg-services-api/pom.xml              | 149 +++++
 .../services/iceberg/IcebergCatalogService.java    |  32 +
 .../nifi-iceberg-services-nar/pom.xml              |  43 ++
 .../src/main/resources/META-INF/LICENSE            | 209 ++++++
 .../src/main/resources/META-INF/NOTICE             |  17 +
 .../nifi-iceberg-services/pom.xml                  |  43 ++
 .../services/iceberg/AbstractCatalogService.java   |  67 ++
 .../services/iceberg/HadoopCatalogService.java     |  77 +++
 .../nifi/services/iceberg/HiveCatalogService.java  | 140 ++++
 .../org.apache.nifi.controller.ControllerService   |  17 +
 nifi-nar-bundles/nifi-iceberg-bundle/pom.xml       |  44 ++
 nifi-nar-bundles/pom.xml                           |   1 +
 44 files changed, 6429 insertions(+), 1 deletion(-)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 3f56380639..599873016c 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>include-iceberg</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-iceberg-processors-nar</artifactId>
+                    <version>1.19.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-iceberg-services-api-nar</artifactId>
+                    <version>1.19.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-iceberg-services-nar</artifactId>
+                    <version>1.19.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+            </dependencies>
+        </profile>
     </profiles>
 </project>
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 21e3b71827..5000c78fe1 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -236,11 +236,15 @@ public class DataTypeUtils {
         return null;
     }
 
-    private static Object toUUID(Object value) {
+    public static UUID toUUID(Object value) {
         if (value == null) {
             throw new IllegalTypeConversionException("Null values cannot be converted to a UUID");
         }
 
+        if (value instanceof UUID) {
+            return (UUID) value;
+        }
+
         if (value instanceof String) {
             try {
                 return UUID.fromString((String)value);
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
new file mode 100644
index 0000000000..2860e79244
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-processors</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api-nar</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..f3574b43b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,222 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Hive.
+
+* Hive metastore derby schema in hive-schema-3.2.0.derby.sql
+* Test methods from IcebergTestUtils.java
+* Test metastore from MetastoreCore.java, ScriptRunner.java, ThriftMetastore.java
+
+Copyright: 2011-2018 The Apache Software Foundation
+Home page: https://hive.apache.org/
+License: https://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..ddc1223787
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,505 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+  (ASLv2) Apache Iceberg
+    The following NOTICE information applies:
+      Apache Iceberg
+      Copyright 2017-2022 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Apache Ant
+    The following NOTICE information applies:
+      Apache Ant
+      Copyright 1999-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Codec
+      The following NOTICE information applies:
+        Apache Commons Codec
+        Copyright 2002-2014 The Apache Software Foundation
+
+        src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+        contains test data from http://aspell.net/test/orig/batch0.tab.
+        Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+        ===============================================================================
+
+        The content of package org.apache.commons.codec.language.bm has been translated
+        from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+        with permission from the original authors.
+        Original source copyright:
+        Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons DBCP
+    The following NOTICE information applies:
+      Apache Commons DBCP
+      Copyright 2001-2015 The Apache Software Foundation.
+
+  (ASLv2) Apache HttpComponents
+      The following NOTICE information applies:
+        Apache HttpComponents Client
+        Copyright 1999-2016 The Apache Software Foundation
+        Apache HttpComponents Core - HttpCore
+        Copyright 2006-2009 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Pool
+    The following NOTICE information applies:
+      Apache Commons Pool
+      Copyright 1999-2009 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons BeanUtils
+    The following NOTICE information applies:
+      Apache Commons BeanUtils
+      Copyright 2000-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Net
+      The following NOTICE information applies:
+        Apache Commons Net
+        Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Parquet
+    The following NOTICE information applies:
+      Apache Parquet MR (Incubating)
+      Copyright 2014 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Audience Annotations
+    The following NOTICE information applies:
+      Apache Yetus
+      Copyright 2008-2018 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2017 The Apache Software Foundation
+
+      The files in the package org.apache.commons.compress.archivers.sevenz
+      were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+      which has been placed in the public domain:
+
+      "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+  (ASLv2) Apache Commons Configuration
+    The following NOTICE information applies:
+      Apache Commons Configuration
+      Copyright 2001-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Text
+    The following NOTICE information applies:
+      Apache Commons Text
+      Copyright 2001-2018 The Apache Software Foundation
+
+  (ASLv2) Apache Commons CLI
+    The following NOTICE information applies:
+        Apache Commons CLI
+        Copyright 2001-2017 The Apache Software Foundation
+
+        This product includes software developed at
+        The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Apache Commons Collections
+    The following NOTICE information applies:
+      Apache Commons Collections
+      Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Daemon
+    The following NOTICE information applies:
+      Apache Commons Daemon
+      Copyright 2002-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Ivy
+      The following NOTICE information applies:
+         Copyright 2007-2017 The Apache Software Foundation
+
+         This product includes software developed at
+         The Apache Software Foundation (http://www.apache.org/).
+
+         Portions of Ivy were originally developed at
+         Jayasoft SARL (http://www.jayasoft.fr/)
+         and are licensed to the Apache Software Foundation under the
+         "Software Grant License Agreement"
+
+         SSH and SFTP support is provided by the JCraft JSch package,
+         which is open source software, available under
+         the terms of a BSD style license.
+         The original software and related information is available
+         at http://www.jcraft.com/jsch/.
+
+  (ASLv2) Apache Commons Math
+    The following NOTICE information applies:
+      Apache Commons Math
+      Copyright 2001-2012 The Apache Software Foundation
+
+  (ASLv2) Apache Hive
+    The following NOTICE information applies:
+      Apache Hive
+      Copyright 2008-2015 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes Jersey (https://jersey.java.net/)
+      Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+      This project includes software copyrighted by Microsoft Corporation and
+      licensed under the Apache License, Version 2.0.
+
+      This project includes software copyrighted by Dell SecureWorks and
+      licensed under the Apache License, Version 2.0.
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+       ## Credits
+
+       A list of contributors may be found from CREDITS file, which is included
+       in some artifacts (usually source distributions); but is always available
+       from the source code management (SCM) system project uses.
+
+  (ASLv2) BoneCP
+    The following NOTICE information applies:
+       BoneCP
+       Copyright 2010 Wallace Wadge
+
+  (ASLv2) Apache Hadoop
+    The following NOTICE information applies:
+      The binary distribution of this product bundles binaries of
+      org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the
+      following notices:
+      * Copyright 2011 Dain Sundstrom <da...@iq80.com>
+      * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+      The binary distribution of this product bundles binaries of
+      org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni),
+      which has the following notices:
+      * This product includes software developed by FuseSource Corp.
+        http://fusesource.com
+      * This product includes software developed at
+        Progress Software Corporation and/or its  subsidiaries or affiliates.
+      * This product includes software developed by IBM Corporation and others.
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Curator
+    The following NOTICE information applies:
+      Curator Framework
+      Copyright 2011-2014 The Apache Software Foundation
+
+      Curator Client
+      Copyright 2011-2014 The Apache Software Foundation
+
+      Curator Recipes
+      Copyright 2011-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Derby
+    The following NOTICE information applies:
+      Apache Derby
+      Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
+      the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
+
+  (ASLv2) Apache Geronimo
+    The following NOTICE information applies:
+      Apache Geronimo
+      Copyright 2003-2008 The Apache Software Foundation
+
+  (ASLv2) Jettison
+    The following NOTICE information applies:
+       Copyright 2006 Envoi Solutions LLC
+
+  (ASLv2) Jetty
+    The following NOTICE information applies:
+       Jetty Web Container
+       Copyright 1995-2019 Mort Bay Consulting Pty Ltd.
+
+  (ASLv2) Apache log4j
+    The following NOTICE information applies:
+      Apache log4j
+      Copyright 2007 The Apache Software Foundation
+
+  (ASLv2) Apache Thrift
+    The following NOTICE information applies:
+      Apache Thrift
+      Copyright 2006-2010 The Apache Software Foundation.
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+          http://creativecommons.org/publicdomain/zero/1.0/
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+      The following NOTICE information applies:
+        The Netty Project
+        Copyright 2011 The Netty Project
+
+  (ASLv2) Apache ZooKeeper
+     The following NOTICE information applies:
+       Apache ZooKeeper
+       Copyright 2009-2012 The Apache Software Foundation
+
+  (ASLv2) Google GSON
+     The following NOTICE information applies:
+       Copyright 2008 Google Inc.
+
+  (ASLv2) JPam
+    The following NOTICE information applies:
+      Copyright 2003-2006 Greg Luck
+
+  (ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org)
+        groovy-2.4.16-indy
+        groovy-json-2.4.16-indy
+        groovy-sql-2.4.16-indy
+    The following NOTICE information applies:
+        Apache Groovy
+        Copyright 2003-2018 The Apache Software Foundation
+
+        This product includes software developed at
+        The Apache Software Foundation (http://www.apache.org/).
+
+        This product includes/uses ANTLR (http://www.antlr2.org/)
+        developed by Terence Parr 1989-2006
+
+  (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:1.2 - http://www.minidev.net/)
+    The following NOTICE information applies:
+      ASM Based Accessors Helper Used By JSON Smart 1.2
+      Copyright 2017, Uriel Chemouni
+
+  (ASLv2) JSON Smart (net.minidev:json-smart:jar:2.3 - http://www.minidev.net/)
+        The following NOTICE information applies:
+          JSON Smart 2.3
+          Copyright 2017, Uriel Chemouni, Eitan Raviv
+
+  (ASLv2) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt - https://connect2id.com/products/nimbus-jose-jwt)
+    The following NOTICE information applies:
+      Nimbus JOSE+JWT
+      Copyright 2021, Connect2id Ltd.
+
+  (ASLv2) Woodstox (com.fasterxml.woodstox:woodstox-core:bundle:5.3.0 - https://github.com/FasterXML/woodstox)
+      The following NOTICE information applies:
+        Woodstox Core 5.3.0
+        Copyright 2015, FasterXML, LLC
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) java-util
+    The following NOTICE information applies:
+       java-util
+       Copyright 2011-2017 Metamarkets Group Inc.
+
+  (ASLv2) JCIP Annotations Under Apache License
+    The following NOTICE information applies:
+      JCIP Annotations Under Apache License
+      Copyright 2013 Stephen Connolly.
+
+  (ASLv2) Google GSON
+    The following NOTICE information applies:
+      Copyright 2008 Google Inc.
+
+  (ASLv2) Guava
+    The following NOTICE information applies:
+      Guava
+      Copyright 2015 The Guava Authors
+
+  (ASLv2) OkHttp
+    The following NOTICE information applies:
+      OkHttp
+      Copyright (C) 2014 Square, Inc.
+
+  (ASLv2) Okio
+    The following NOTICE information applies:
+      Okio
+      Copyright (C) 2014 Square, Inc.
+
+  (ASLv2) Dropwizard Metrics
+      The following NOTICE information applies:
+        Dropwizard Metrics
+        Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+  (ASLv2) atinject (javax.inject:javax.inject)
+    The following NOTICE information applies:
+      atinject
+      Copyright
+
+  (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+  (ASLv2) JetBrains/java-annotations
+    The following NOTICE information applies:
+      JetBrains/java-annotations
+      Copyright 2000-2016 JetBrains s.r.o.
+
+  (ASLv2) Apache Kerby
+    The following NOTICE information applies:
+      Apache Kerby
+      Copyright 2003-2018 The Apache Software Foundation
+
+    (ASLv2) Carrotsearch HPPC
+      The following NOTICE information applies:
+      HPPC borrowed code, ideas or both from:
+
+       * Apache Lucene, http://lucene.apache.org/
+         (Apache license)
+       * Fastutil, http://fastutil.di.unimi.it/
+         (Apache license)
+       * Koloboke, https://github.com/OpenHFT/Koloboke
+         (Apache license)
+
+  (ASLv2) Ehcache 2.x
+    The following NOTICE information applies:
+      Copyright 2003-2010 Terracotta, Inc.
+
+  (ASLv2) Google Guice
+    The following NOTICE information applies:
+      Google Guice - Core Library
+      Copyright 2006-2011 Google, Inc.
+
+      Google Guice - Extensions - Servlet
+      Copyright 2006-2011 Google, Inc.
+
+  (ASLv2) Apache Arrow
+    The following NOTICE information applies:
+      Copyright 2016-2019 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+
+  (ASLv2) Apache ORC
+    The following NOTICE information applies:
+      Copyright 2013-2019 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes software developed by Hewlett-Packard:
+      (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+  The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.11 - https://jaxb.dev.java.net/)
+
+************************
+Common Development and Distribution License 1.0
+************************
+
+    The following binary components are provided under the Common Development and Distribution License 1.0.  See project link for details.
+
+    (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:3.1.0 - no url available)
+    (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
+    (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
+
+*****************
+Public Domain
+*****************
+
+  The following binary components are provided to the 'Public Domain'.  See project link for details.
+
+      (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
+
+
+************************
+Eclipse Distribution License 1.0
+************************
+
+  The following binary components are provided under the Eclipse Distribution License 1.0.
+
+      (EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.1)
+      (EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
+
+************************
+Eclipse Public License 2.0
+************************
+
+  The following binary components are provided under the Eclipse Public License 2.0.
+
+      (EPL 2.0) javax.ws.rs-api (https://github.com/eclipse-ee4j/jaxrs-api) javax.ws.rs:javax.ws.rs-api:bundle:2.1.1
+
+************************
+BSD License
+************************
+
+  (BSD) JSch
+    The following NOTICE information applies:
+      Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc.
+      All rights reserved.
+      https://www.jcraft.com/jsch/
+
+  (BSD 3-Clause) JLine Bundle
+    The following NOTICE information applies:
+      Copyright (c) 2002-2007, Marc Prud'hommeaux. All rights reserved.
+      https://github.com/jline/jline1
+
+  (BSD 3-Clause) ThreeTen-Extra
+    The following NOTICE information applies:
+      Copyright (c) 2007-2022, Stephen Colebourne & Michael Nascimento Santos.
+      https://github.com/ThreeTen/threeten-extra/
+
+************************
+Go License
+************************
+
+The following binary components are provided under the Go License.  See project link for details.
+
+  (Go) RE2/J
+    The following NOTICE information applies:
+      Copyright (c) 2009 The Go Authors. All rights reserved.
+      https://github.com/google/re2j
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
new file mode 100644
index 0000000000..09c5081d08
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <!-- Internal dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+
+        <!-- External dependencies -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <classifier>core</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
new file mode 100644
index 0000000000..7dc53eefd7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+import static org.apache.nifi.processors.iceberg.PutIceberg.REL_FAILURE;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+    private volatile UserGroupInformation ugi;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+            try {
+                this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser);
+            } catch (IOException e) {
+                throw new ProcessException("Kerberos Authentication failed", e);
+            }
+        }
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+            } catch (KerberosLoginException e) {
+                getLogger().error("Error logging out kerberos user", e);
+            } finally {
+                kerberosUser = null;
+                ugi = null;
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (kerberosUser == null) {
+            doOnTrigger(context, session, flowFile);
+        } else {
+            try {
+                getUgi().doAs((PrivilegedExceptionAction<Void>) () -> {
+                    doOnTrigger(context, session, flowFile);
+                    return null;
+                });
+
+            } catch (Exception e) {
+                getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        }
+    }
+
+    private UserGroupInformation getUgi() {
+        try {
+            kerberosUser.checkTGTAndRelogin();
+        } catch (KerberosLoginException e) {
+            throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e);
+        }
+        return ugi;
+    }
+
+    protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException;
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
new file mode 100644
index 0000000000..7fb136b853
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.Tasks;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. " +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile.")
+})
+public class PutIceberg extends AbstractIcebergProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog Namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Table Name")
+            .description("The name of the table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder()
+            .name("file-format")
+            .displayName("File Format")
+            .description("File format to use when writing Iceberg data files." +
+                    " If not set, then the 'write.format.default' table property will be used, default value is parquet.")
+            .allowableValues(
+                    new AllowableValue("AVRO"),
+                    new AllowableValue("PARQUET"),
+                    new AllowableValue("ORC"))
+            .build();
+
+    static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder()
+            .name("maximum-file-size")
+            .displayName("Maximum File Size")
+            .description("The maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data." +
+                    " If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.")
+            .addValidator(StandardValidators.LONG_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            CATALOG_NAMESPACE,
+            TABLE_NAME,
+            FILE_FORMAT,
+            MAXIMUM_FILE_SIZE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final String fileFormat = context.getProperty(FILE_FORMAT).evaluateAttributeExpressions().getValue();
+        final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions().getValue();
+
+        Table table;
+
+        try {
+            table = loadTable(context);
+        } catch (Exception e) {
+            getLogger().error("Failed to load table from catalog", e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        TaskWriter<org.apache.iceberg.data.Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+            final FileFormat format = getFileFormat(table.properties(), fileFormat);
+            final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
+            taskWriter = taskWriterFactory.create();
+
+            final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format);
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                taskWriter.write(recordConverter.convert(record));
+                recordCount++;
+            }
+
+            final WriteResult result = taskWriter.complete();
+            appendDataFiles(table, result);
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
+            try {
+                if (taskWriter != null) {
+                    abort(taskWriter.dataFiles(), table);
+                }
+            } catch (Exception ex) {
+                getLogger().error("Failed to abort uncommitted data files", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    /**
+     * Loads a table from the catalog service with the provided values from the property context.
+     *
+     * @param context holds the user provided information for the {@link Catalog} and the {@link Table}
+     * @return loaded table
+     */
+    private Table loadTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
+
+        return catalog.loadTable(tableIdentifier);
+    }
+
+    /**
+     * Appends the pending data files to the given {@link Table}.
+     *
+     * @param table  table to append
+     * @param result datafiles created by the {@link TaskWriter}
+     */
+    private void appendDataFiles(Table table, WriteResult result) {
+        final AppendFiles appender = table.newAppend();
+        Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
+
+        appender.commit();
+    }
+
+    /**
+     * Determines the write file format from the requested value and the table configuration.
+     *
+     * @param tableProperties table properties
+     * @param fileFormat      requested file format from the processor
+     * @return file format to use
+     */
+    private FileFormat getFileFormat(Map<String, String> tableProperties, String fileFormat) {
+        final String fileFormatName = fileFormat != null ? fileFormat : tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+        return FileFormat.valueOf(fileFormatName.toUpperCase(Locale.ENGLISH));
+    }
+
+    /**
+     * Deletes the completed data files that have not been committed to the table yet.
+     *
+     * @param dataFiles files created by the task writer
+     * @param table     table
+     */
+    void abort(DataFile[] dataFiles, Table table) {
+        Tasks.foreach(dataFiles)
+                .throwFailureWhenFinished()
+                .noRetry()
+                .run(file -> table.io().deleteFile(file.path().toString()));
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
new file mode 100644
index 0000000000..9a98404b26
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.ZoneId;
+
+public class ArrayElementGetter {
+
+    private static final String ARRAY_FIELD_NAME = "array element";
+
+    /**
+     * Creates an accessor for getting elements in an internal array data structure at the given
+     * position.
+     *
+     * @param dataType the element type of the array
+     */
+    public static ElementGetter createElementGetter(DataType dataType) {
+        ElementGetter elementGetter;
+        switch (dataType.getFieldType()) {
+            case STRING:
+                elementGetter = (array, pos) -> DataTypeUtils.toString(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case CHAR:
+                elementGetter = (array, pos) -> DataTypeUtils.toCharacter(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case BOOLEAN:
+                elementGetter = (array, pos) -> DataTypeUtils.toBoolean(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case DECIMAL:
+                elementGetter = (array, pos) -> DataTypeUtils.toBigDecimal(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case BYTE:
+                elementGetter = (array, pos) -> DataTypeUtils.toByte(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case SHORT:
+                elementGetter = (array, pos) -> DataTypeUtils.toShort(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case INT:
+                elementGetter = (array, pos) -> DataTypeUtils.toInteger(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case DATE:
+                elementGetter = (array, pos) -> DataTypeUtils.toLocalDate(array[pos], () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), ARRAY_FIELD_NAME);
+                break;
+            case TIME:
+                elementGetter = (array, pos) -> DataTypeUtils.toTime(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
+                break;
+            case LONG:
+                elementGetter = (array, pos) -> DataTypeUtils.toLong(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case BIGINT:
+                elementGetter = (array, pos) -> DataTypeUtils.toBigInt(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case FLOAT:
+                elementGetter = (array, pos) -> DataTypeUtils.toFloat(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case DOUBLE:
+                elementGetter = (array, pos) -> DataTypeUtils.toDouble(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case TIMESTAMP:
+                elementGetter = (array, pos) -> DataTypeUtils.toTimestamp(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
+                break;
+            case UUID:
+                elementGetter = (array, pos) -> DataTypeUtils.toUUID(array[pos]);
+                break;
+            case ARRAY:
+                elementGetter = (array, pos) -> DataTypeUtils.toArray(array[pos], ARRAY_FIELD_NAME, ((ArrayDataType) dataType).getElementType());
+                break;
+            case MAP:
+                elementGetter = (array, pos) -> DataTypeUtils.toMap(array[pos], ARRAY_FIELD_NAME);
+                break;
+            case RECORD:
+                elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+
+        return (array, pos) -> {
+            if (array[pos] == null) {
+                return null;
+            }
+
+            return elementGetter.getElementOrNull(array, pos);
+        };
+    }
+
+    /**
+     * Accessor for getting the elements of an array during runtime.
+     */
+    public interface ElementGetter extends Serializable {
+        @Nullable
+        Object getElementOrNull(Object[] array, int pos);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java
new file mode 100644
index 0000000000..1ea761db48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+/**
+ * Interface for data conversion between NiFi Record and Iceberg Record.
+ */
+public interface DataConverter<D, T> {
+    T convert(D data);
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
new file mode 100644
index 0000000000..d216299d18
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.createFieldGetter;
+
+/**
+ * Data converter implementations for different data types.
+ */
+public class GenericDataConverters {
+
+    static class SameTypeConverter implements DataConverter<Object, Object> {
+
+        static final SameTypeConverter INSTANCE = new SameTypeConverter();
+
+        @Override
+        public Object convert(Object data) {
+            return data;
+        }
+    }
+
+    static class TimeConverter implements DataConverter<Time, LocalTime> {
+
+        static final TimeConverter INSTANCE = new TimeConverter();
+
+        @Override
+        public LocalTime convert(Time data) {
+            return data.toLocalTime();
+        }
+    }
+
+    static class TimestampConverter implements DataConverter<Timestamp, LocalDateTime> {
+
+        static final TimestampConverter INSTANCE = new TimestampConverter();
+
+        @Override
+        public LocalDateTime convert(Timestamp data) {
+            return data.toLocalDateTime();
+        }
+    }
+
+    static class TimestampWithTimezoneConverter implements DataConverter<Timestamp, OffsetDateTime> {
+
+        static final TimestampWithTimezoneConverter INSTANCE = new TimestampWithTimezoneConverter();
+
+        @Override
+        public OffsetDateTime convert(Timestamp data) {
+            return OffsetDateTime.ofInstant(data.toInstant(), ZoneId.of("UTC"));
+        }
+    }
+
+    static class UUIDtoByteArrayConverter implements DataConverter<UUID, byte[]> {
+
+        static final UUIDtoByteArrayConverter INSTANCE = new UUIDtoByteArrayConverter();
+
+        @Override
+        public byte[] convert(UUID data) {
+            ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+            byteBuffer.putLong(data.getMostSignificantBits());
+            byteBuffer.putLong(data.getLeastSignificantBits());
+            return byteBuffer.array();
+        }
+    }
+
+    static class FixedConverter implements DataConverter<Byte[], byte[]> {
+
+        private final int length;
+
+        FixedConverter(int length) {
+            this.length = length;
+        }
+
+        @Override
+        public byte[] convert(Byte[] data) {
+            Validate.isTrue(data.length == length, String.format("Cannot write byte array of length %s as fixed[%s]", data.length, length));
+            return ArrayUtils.toPrimitive(data);
+        }
+    }
+
+    static class BinaryConverter implements DataConverter<Byte[], ByteBuffer> {
+
+        static final BinaryConverter INSTANCE = new BinaryConverter();
+
+        @Override
+        public ByteBuffer convert(Byte[] data) {
+            return ByteBuffer.wrap(ArrayUtils.toPrimitive(data));
+        }
+    }
+
+    static class BigDecimalConverter implements DataConverter<BigDecimal, BigDecimal> {
+
+        private final int precision;
+        private final int scale;
+
+        BigDecimalConverter(int precision, int scale) {
+            this.precision = precision;
+            this.scale = scale;
+        }
+
+        @Override
+        public BigDecimal convert(BigDecimal data) {
+            Validate.isTrue(data.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data);
+            Validate.isTrue(data.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data);
+            return data;
+        }
+    }
+
+    static class ArrayConverter<T, S> implements DataConverter<T[], List<S>> {
+        private final DataConverter<T, S> fieldConverter;
+        private final ArrayElementGetter.ElementGetter elementGetter;
+
+        ArrayConverter(DataConverter<T, S> elementConverter, DataType dataType) {
+            this.fieldConverter = elementConverter;
+            this.elementGetter = ArrayElementGetter.createElementGetter(dataType);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<S> convert(T[] data) {
+            final int numElements = data.length;
+            List<S> result = new ArrayList<>(numElements);
+            for (int i = 0; i < numElements; i += 1) {
+                result.add(i, fieldConverter.convert((T) elementGetter.getElementOrNull(data, i)));
+            }
+            return result;
+        }
+    }
+
+    static class MapConverter<K, V, L, B> implements DataConverter<Map<K, V>, Map<L, B>> {
+        private final DataConverter<K, L> keyConverter;
+        private final DataConverter<V, B> valueConverter;
+        private final ArrayElementGetter.ElementGetter keyGetter;
+        private final ArrayElementGetter.ElementGetter valueGetter;
+
+        MapConverter(DataConverter<K, L> keyConverter, DataType keyType, DataConverter<V, B> valueConverter, DataType valueType) {
+            this.keyConverter = keyConverter;
+            this.keyGetter = ArrayElementGetter.createElementGetter(keyType);
+            this.valueConverter = valueConverter;
+            this.valueGetter = ArrayElementGetter.createElementGetter(valueType);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Map<L, B> convert(Map<K, V> data) {
+            final int mapSize = data.size();
+            final Object[] keyArray = data.keySet().toArray();
+            final Object[] valueArray = data.values().toArray();
+            Map<L, B> result = new HashMap<>(mapSize);
+            for (int i = 0; i < mapSize; i += 1) {
+                result.put(keyConverter.convert((K) keyGetter.getElementOrNull(keyArray, i)), valueConverter.convert((V) valueGetter.getElementOrNull(valueArray, i)));
+            }
+
+            return result;
+        }
+    }
+
+    static class RecordConverter implements DataConverter<Record, GenericRecord> {
+
+        private final DataConverter<?, ?>[] converters;
+        private final RecordFieldGetter.FieldGetter[] getters;
+
+        private final Types.StructType schema;
+
+        RecordConverter(List<DataConverter<?, ?>> converters, List<RecordField> recordFields, Types.StructType schema) {
+            this.schema = schema;
+            this.converters = (DataConverter<?, ?>[]) Array.newInstance(DataConverter.class, converters.size());
+            this.getters = new RecordFieldGetter.FieldGetter[converters.size()];
+            for (int i = 0; i < converters.size(); i += 1) {
+                final RecordField recordField = recordFields.get(i);
+                this.converters[i] = converters.get(i);
+                this.getters[i] = createFieldGetter(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
+            }
+        }
+
+        @Override
+        public GenericRecord convert(Record data) {
+            final GenericRecord template = GenericRecord.create(schema);
+            // GenericRecord.copy() is more performant then GenericRecord.create(StructType) since NAME_MAP_CACHE access is eliminated. Using copy here to gain performance.
+            final GenericRecord result = template.copy();
+
+            for (int i = 0; i < converters.length; i += 1) {
+                result.set(i, convert(data, i, converters[i]));
+            }
+
+            return result;
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T, S> S convert(Record record, int pos, DataConverter<T, S> converter) {
+            return converter.convert((T) getters[pos].getFieldOrNull(record));
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
new file mode 100644
index 0000000000..611f4b29b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        final Types.TimestampType timestampType = (Types.TimestampType) type;
+                        if (timestampType.shouldAdjustToUTC()) {
+                            return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.TimestampConverter.INSTANCE;
+                    case UUID:
+                        final UUIDDataType uuidType = (UUIDDataType) dataType;
+                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                            return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case FIXED:
+                        final Types.FixedType fixedType = (Types.FixedType) type;
+                        return new GenericDataConverters.FixedConverter(fixedType.length());
+                    case BINARY:
+                        return GenericDataConverters.BinaryConverter.INSTANCE;
+                    case DECIMAL:
+                        final Types.DecimalType decimalType = (Types.DecimalType) type;
+                        return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
+                    default:
+                        throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
+                }
+            }
+            throw new UnsupportedOperationException("Missing type id from PrimitiveType " + type);
+        }
+
+        @Override
+        public DataConverter<?, ?> struct(Types.StructType type, DataType dataType, List<DataConverter<?, ?>> converters) {
+            Validate.notNull(type, "Can not create reader for null type");
+            final List<RecordField> recordFields = ((RecordDataType) dataType).getChildSchema().getFields();
+            return new GenericDataConverters.RecordConverter(converters, recordFields, type);
+        }
+
+        @Override
+        public DataConverter<?, ?> list(Types.ListType listTypeInfo, DataType dataType, DataConverter<?, ?> converter) {
+            return new GenericDataConverters.ArrayConverter<>(converter, ((ArrayDataType) dataType).getElementType());
+        }
+
+        @Override
+        public DataConverter<?, ?> map(Types.MapType mapType, DataType dataType, DataConverter<?, ?> keyConverter, DataConverter<?, ?> valueConverter) {
+            return new GenericDataConverters.MapConverter<>(keyConverter, RecordFieldType.STRING.getDataType(), valueConverter, ((MapDataType) dataType).getValueType());
+        }
+    }
+
+    public static class IcebergPartnerAccessors implements SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
+        private final FileFormat fileFormat;
+
+        IcebergPartnerAccessors(FileFormat fileFormat) {
+            this.fileFormat = fileFormat;
+        }
+
+        @Override
+        public DataType fieldPartner(DataType dataType, int fieldId, String name) {
+            Validate.isTrue(dataType instanceof RecordDataType, String.format("Invalid record: %s is not a record", dataType));
+            final RecordDataType recordType = (RecordDataType) dataType;
+            final Optional<RecordField> recordField = recordType.getChildSchema().getField(name);
+
+            Validate.isTrue(recordField.isPresent(), String.format("Cannot find record field with name %s", name));
+            final RecordField field = recordField.get();
+
+            if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
+                return new UUIDDataType(field.getDataType(), fileFormat);
+            }
+
+            return field.getDataType();
+        }
+
+        @Override
+        public DataType mapKeyPartner(DataType dataType) {
+            return RecordFieldType.STRING.getDataType();
+        }
+
+        @Override
+        public DataType mapValuePartner(DataType dataType) {
+            Validate.isTrue(dataType instanceof MapDataType, String.format("Invalid map: %s is not a map", dataType));
+            final MapDataType mapType = (MapDataType) dataType;
+            return mapType.getValueType();
+        }
+
+        @Override
+        public DataType listElementPartner(DataType dataType) {
+            Validate.isTrue(dataType instanceof ArrayDataType, String.format("Invalid array: %s is not an array", dataType));
+            final ArrayDataType arrayType = (ArrayDataType) dataType;
+            return arrayType.getElementType();
+        }
+    }
+
+    /**
+     * Parquet writer expects the UUID value in different format, so it needs to be converted differently: <a href="https://github.com/apache/iceberg/issues/1881">#1881</a>
+     */
+    private static class UUIDDataType extends DataType {
+
+        private final FileFormat fileFormat;
+
+        UUIDDataType(DataType dataType, FileFormat fileFormat) {
+            super(dataType.getFieldType(), dataType.getFormat());
+            this.fileFormat = fileFormat;
+        }
+
+        public FileFormat getFileFormat() {
+            return fileFormat;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
new file mode 100644
index 0000000000..ca50c49f89
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.ZoneId;
+
+public class RecordFieldGetter {
+
+    /**
+     * Creates an accessor for getting elements in an internal record data structure with the given
+     * field name.
+     *
+     * @param dataType   the element type of the field
+     * @param fieldName  the name of the field
+     * @param isNullable indicates if the field's value is nullable
+     */
+    public static FieldGetter createFieldGetter(DataType dataType, String fieldName, boolean isNullable) {
+        FieldGetter fieldGetter;
+        switch (dataType.getFieldType()) {
+            case STRING:
+                fieldGetter = record -> record.getAsString(fieldName);
+                break;
+            case CHAR:
+                fieldGetter = record -> DataTypeUtils.toCharacter(record.getValue(fieldName), fieldName);
+                break;
+            case BOOLEAN:
+                fieldGetter = record -> record.getAsBoolean(fieldName);
+                break;
+            case DECIMAL:
+                fieldGetter = record -> DataTypeUtils.toBigDecimal(record.getValue(fieldName), fieldName);
+                break;
+            case BYTE:
+                fieldGetter = record -> DataTypeUtils.toByte(record.getValue(fieldName), fieldName);
+                break;
+            case SHORT:
+                fieldGetter = record -> DataTypeUtils.toShort(record.getValue(fieldName), fieldName);
+                break;
+            case INT:
+                fieldGetter = record -> record.getAsInt(fieldName);
+                break;
+            case DATE:
+                fieldGetter = record -> DataTypeUtils.toLocalDate(record.getValue(fieldName), () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), fieldName);
+                break;
+            case TIME:
+                fieldGetter = record -> DataTypeUtils.toTime(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(dataType.getFormat()), fieldName);
+                break;
+            case LONG:
+                fieldGetter = record -> record.getAsLong(fieldName);
+                break;
+            case BIGINT:
+                fieldGetter = record -> DataTypeUtils.toBigInt(record.getValue(fieldName), fieldName);
+                break;
+            case FLOAT:
+                fieldGetter = record -> record.getAsFloat(fieldName);
+                break;
+            case DOUBLE:
+                fieldGetter = record -> record.getAsDouble(fieldName);
+                break;
+            case TIMESTAMP:
+                fieldGetter = record -> DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(dataType.getFormat()), fieldName);
+                break;
+            case UUID:
+                fieldGetter = record -> DataTypeUtils.toUUID(record.getValue(fieldName));
+                break;
+            case ARRAY:
+                fieldGetter = record -> DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType) dataType).getElementType());
+                break;
+            case MAP:
+                fieldGetter = record -> DataTypeUtils.toMap(record.getValue(fieldName), fieldName);
+                break;
+            case RECORD:
+                fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema());
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+
+        if (!isNullable) {
+            return fieldGetter;
+        }
+
+        return record -> {
+            if (record.getValue(fieldName) == null) {
+                return null;
+            }
+
+            return fieldGetter.getFieldOrNull(record);
+        };
+    }
+
+    /**
+     * Accessor for getting the field of a record during runtime.
+     */
+
+    public interface FieldGetter extends Serializable {
+        @Nullable
+        Object getFieldOrNull(Record record);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java
new file mode 100644
index 0000000000..fca8b92fc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+
+/**
+ * This class adapts {@link Record} for partitioned writing.
+ */
+public class IcebergPartitionedWriter extends PartitionedFanoutWriter<Record> {
+
+    private final PartitionKey partitionKey;
+    private final InternalRecordWrapper wrapper;
+
+    IcebergPartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory,
+                             FileIO io, long targetFileSize, Schema schema) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+        this.partitionKey = new PartitionKey(spec, schema);
+        this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    }
+
+    @Override
+    protected PartitionKey partition(Record record) {
+        partitionKey.partition(wrapper.wrap(record));
+        return partitionKey;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java
new file mode 100644
index 0000000000..4dc2bbb6c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+/**
+ * Factory class to create the suitable {@link TaskWriter} based on the {@link Table}'s properties
+ */
+public class IcebergTaskWriterFactory {
+
+    private final Schema schema;
+    private final PartitionSpec spec;
+    private final FileIO io;
+    private final long targetFileSize;
+    private final FileFormat fileFormat;
+    private final FileAppenderFactory<Record> appenderFactory;
+    private final OutputFileFactory outputFileFactory;
+
+    public IcebergTaskWriterFactory(Table table, long taskId, FileFormat fileFormat, String targetFileSize) {
+        this.schema = table.schema();
+        this.spec = table.spec();
+        this.io = table.io();
+        this.fileFormat = fileFormat;
+
+        this.targetFileSize = targetFileSize != null ? Long.parseLong(targetFileSize) :
+                PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+        this.outputFileFactory = OutputFileFactory.builderFor(table, table.spec().specId(), taskId).format(fileFormat).build();
+        this.appenderFactory = new GenericAppenderFactory(schema, spec);
+    }
+
+    public TaskWriter<Record> create() {
+        if (spec.isUnpartitioned()) {
+            return new UnpartitionedWriter<>(spec, fileFormat, appenderFactory, outputFileFactory, io, targetFileSize);
+        } else {
+            return new IcebergPartitionedWriter(spec, fileFormat, appenderFactory, outputFileFactory, io, targetFileSize, schema);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..f884831b07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.processors.iceberg.PutIceberg
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
new file mode 100644
index 0000000000..0b5403986b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestFileAbort {
+
+    private static final Namespace NAMESPACE = Namespace.of("default");
+    private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort");
+
+    private static final Schema ABORT_SCHEMA = new Schema(
+            Types.NestedField.required(0, "id", Types.IntegerType.get())
+    );
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void abortUncommittedFiles() throws IOException {
+        Table table = initCatalog();
+
+        List<RecordField> recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType()));
+        RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
+
+        List<MapRecord> recordList = new ArrayList<>();
+        recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1)));
+        recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2)));
+        recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3)));
+        recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4)));
+        recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5)));
+
+        IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null);
+        TaskWriter<Record> taskWriter = taskWriterFactory.create();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
+
+        for (MapRecord record : recordList) {
+            taskWriter.write(recordConverter.convert(record));
+        }
+
+        DataFile[] dataFiles = taskWriter.dataFiles();
+
+        // DataFiles written by the taskWriter should exist
+        for (DataFile dataFile : dataFiles) {
+            Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
+        }
+
+        PutIceberg icebergProcessor = new PutIceberg();
+        icebergProcessor.abort(taskWriter.dataFiles(), table);
+
+        // DataFiles shouldn't exist after aborting them
+        for (DataFile dataFile : dataFiles) {
+            Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
+        }
+    }
+
+    private Table initCatalog() throws IOException {
+        TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
+        Catalog catalog = catalogService.getCatalog();
+
+        return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
new file mode 100644
index 0000000000..1308316e16
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.io.File.createTempFile;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestIcebergRecordConverter {
+
+    private OutputFile tempFile;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        tempFile = Files.localOutput(createTempFile("test", null));
+    }
+
+    @AfterEach
+    public void tearDown() {
+        File file = new File(tempFile.location());
+        file.deleteOnExit();
+    }
+
+    private static final Schema STRUCT = new Schema(
+            Types.NestedField.required(0, "struct", Types.StructType.of(
+                    Types.NestedField.required(1, "nested_struct", Types.StructType.of(
+                            Types.NestedField.required(2, "string", Types.StringType.get()),
+                            Types.NestedField.required(3, "integer", Types.IntegerType.get()))
+                    )
+            ))
+    );
+
+    private static final Schema LIST = new Schema(
+            Types.NestedField.required(0, "list", Types.ListType.ofRequired(
+                            1, Types.ListType.ofRequired(
+                                    2, Types.StringType.get())
+                    )
+            )
+    );
+
+    private static final Schema MAP = new Schema(
+            Types.NestedField.required(0, "map", Types.MapType.ofRequired(
+                    1, 2, Types.StringType.get(), Types.MapType.ofRequired(
+                            3, 4, Types.StringType.get(), Types.LongType.get()
+                    )
+            ))
+    );
+
+    private static final Schema PRIMITIVES = new Schema(
+            Types.NestedField.optional(0, "string", Types.StringType.get()),
+            Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "float", Types.FloatType.get()),
+            Types.NestedField.optional(3, "long", Types.LongType.get()),
+            Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+            Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
+            Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
+            Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
+            Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+            Types.NestedField.optional(9, "date", Types.DateType.get()),
+            Types.NestedField.optional(10, "time", Types.TimeType.get()),
+            Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
+            Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(13, "uuid", Types.UUIDType.get())
+    );
+
+    private static RecordSchema getStructSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("struct", new RecordDataType(getNestedStructSchema())));
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static RecordSchema getNestedStructSchema() {
+        List<RecordField> nestedFields = new ArrayList<>();
+        nestedFields.add(new RecordField("nested_struct", new RecordDataType(getNestedStructSchema2())));
+
+        return new SimpleRecordSchema(nestedFields);
+    }
+
+    private static RecordSchema getNestedStructSchema2() {
+        List<RecordField> nestedFields2 = new ArrayList<>();
+        nestedFields2.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+        nestedFields2.add(new RecordField("integer", RecordFieldType.INT.getDataType()));
+
+        return new SimpleRecordSchema(nestedFields2);
+    }
+
+    private static RecordSchema getListSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("list", new ArrayDataType(
+                new ArrayDataType(RecordFieldType.STRING.getDataType()))));
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static RecordSchema getMapSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("map", new MapDataType(
+                new MapDataType(RecordFieldType.LONG.getDataType()))));
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static RecordSchema getPrimitivesSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("integer", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+        fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+        fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
+        fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static Record setupStructTestRecord() {
+        Map<String, Object> nestedValues2 = new HashMap<>();
+        nestedValues2.put("string", "Test String");
+        nestedValues2.put("integer", 10);
+        MapRecord nestedRecord2 = new MapRecord(getNestedStructSchema2(), nestedValues2);
+
+        Map<String, Object> nestedValues = new HashMap<>();
+        nestedValues.put("nested_struct", nestedRecord2);
+        MapRecord nestedRecord = new MapRecord(getNestedStructSchema(), nestedValues);
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("struct", nestedRecord);
+        return new MapRecord(getStructSchema(), values);
+    }
+
+    private static Record setupListTestRecord() {
+        List<String> nestedList = new ArrayList<>();
+        nestedList.add("Test String");
+
+        List<Collection> list = new ArrayList<>();
+        list.add(nestedList);
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("list", list);
+
+        return new MapRecord(getListSchema(), values);
+    }
+
+    private static Record setupMapTestRecord() {
+        Map<String, Long> nestedMap = new HashMap<>();
+        nestedMap.put("nested_key", 42L);
+
+        Map<String, Map> map = new HashMap<>();
+        map.put("key", nestedMap);
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("map", map);
+
+        return new MapRecord(getMapSchema(), values);
+    }
+
+    private static Record setupPrimitivesTestRecord(RecordSchema schema) {
+        LocalDate localDate = LocalDate.of(2017, 4, 4);
+        LocalTime localTime = LocalTime.of(14, 20, 33);
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("string", "Test String");
+        values.put("integer", 8);
+        values.put("float", 1.23456F);
+        values.put("long", 42L);
+        values.put("double", 3.14159D);
+        values.put("decimal", new BigDecimal("12345678.12"));
+        values.put("boolean", true);
+        values.put("fixed", "hello".getBytes());
+        values.put("binary", "hello".getBytes());
+        values.put("date", localDate);
+        values.put("time", Time.valueOf(localTime));
+        values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
+        values.put("timestampTz", Timestamp.valueOf(localDateTime));
+        values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+
+        return new MapRecord(schema, values);
+    }
+
+    @Test
+    public void testPrimitivesAvro() throws IOException {
+        RecordSchema nifiSchema = getPrimitivesSchema();
+        Record record = setupPrimitivesTestRecord(nifiSchema);
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToAvro(PRIMITIVES, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromAvro(PRIMITIVES, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+        Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+        Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+        Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+        Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+        Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+        Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+        Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+        Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+        Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+        Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+        Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testPrimitivesOrc() throws IOException {
+        RecordSchema nifiSchema = getPrimitivesSchema();
+        Record record = setupPrimitivesTestRecord(nifiSchema);
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToOrc(PRIMITIVES, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromOrc(PRIMITIVES, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+        Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+        Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+        Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+        Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+        Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+        Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+        Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+        Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+        Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+        Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+        Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+    }
+
+    @Test
+    public void testPrimitivesParquet() throws IOException {
+        RecordSchema nifiSchema = getPrimitivesSchema();
+        Record record = setupPrimitivesTestRecord(nifiSchema);
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToParquet(PRIMITIVES, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromParquet(PRIMITIVES, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+        Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+        Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+        Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+        Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+        Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+        Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+        Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+        Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+        Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+        Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+        Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+        Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+    }
+
+    @Test
+    public void testStructAvro() throws IOException {
+        RecordSchema nifiSchema = getStructSchema();
+        Record record = setupStructTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.AVRO);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToAvro(STRUCT, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromAvro(STRUCT, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+        GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+        GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+        Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testStructOrc() throws IOException {
+        RecordSchema nifiSchema = getStructSchema();
+        Record record = setupStructTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.ORC);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToOrc(STRUCT, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromOrc(STRUCT, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+        GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+        GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+        Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+    }
+
+    @Test
+    public void testStructParquet() throws IOException {
+        RecordSchema nifiSchema = getStructSchema();
+        Record record = setupStructTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.PARQUET);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToParquet(STRUCT, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromParquet(STRUCT, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+        GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedRecord.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+        GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+        Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+        Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+    }
+
+    @Test
+    public void testListAvro() throws IOException {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToAvro(LIST, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromAvro(LIST, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+        List nestedList = (List) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedList.size(), 1);
+        Assertions.assertInstanceOf(List.class, nestedList.get(0));
+        List baseList = (List) nestedList.get(0);
+
+        Assertions.assertEquals(baseList.get(0), "Test String");
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testListOrc() throws IOException {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToOrc(LIST, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromOrc(LIST, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+        List nestedList = (List) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedList.size(), 1);
+        Assertions.assertInstanceOf(List.class, nestedList.get(0));
+        List baseList = (List) nestedList.get(0);
+
+        Assertions.assertEquals(baseList.get(0), "Test String");
+    }
+
+    @Test
+    public void testListParquet() throws IOException {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToParquet(LIST, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromParquet(LIST, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+        List nestedList = (List) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedList.size(), 1);
+        Assertions.assertInstanceOf(List.class, nestedList.get(0));
+        List baseList = (List) nestedList.get(0);
+
+        Assertions.assertEquals(baseList.get(0), "Test String");
+    }
+
+    @Test
+    public void testMapAvro() throws IOException {
+        RecordSchema nifiSchema = getMapSchema();
+        Record record = setupMapTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.AVRO);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToAvro(MAP, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromAvro(MAP, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+        Map nestedMap = (Map) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedMap.size(), 1);
+        Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+        Map baseMap = (Map) nestedMap.get("key");
+
+        Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testMapOrc() throws IOException {
+        RecordSchema nifiSchema = getMapSchema();
+        Record record = setupMapTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.ORC);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToOrc(MAP, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromOrc(MAP, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+        Map nestedMap = (Map) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedMap.size(), 1);
+        Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+        Map baseMap = (Map) nestedMap.get("key");
+
+        Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+    }
+
+    @Test
+    public void testMapParquet() throws IOException {
+        RecordSchema nifiSchema = getMapSchema();
+        Record record = setupMapTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.PARQUET);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeToParquet(MAP, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFromParquet(MAP, tempFile.toInputFile());
+
+        Assertions.assertEquals(results.size(), 1);
+        Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+        GenericRecord resultRecord = results.get(0);
+
+        Assertions.assertEquals(resultRecord.size(), 1);
+        Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+        Map nestedMap = (Map) resultRecord.get(0);
+
+        Assertions.assertEquals(nestedMap.size(), 1);
+        Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+        Map baseMap = (Map) nestedMap.get("key");
+
+        Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+    }
+
+    @Test
+    public void testSchemaMismatchAvro() {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        DataFileWriter.AppendWriteException e = assertThrows(DataFileWriter.AppendWriteException.class, () -> writeToAvro(STRUCT, genericRecord, tempFile));
+        assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"), e.getMessage());
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testSchemaMismatchOrc() {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        ClassCastException e = assertThrows(ClassCastException.class, () -> writeToOrc(STRUCT, genericRecord, tempFile));
+        assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
+    }
+
+    @Test
+    public void testSchemaMismatchParquet() {
+        RecordSchema nifiSchema = getListSchema();
+        Record record = setupListTestRecord();
+
+        IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        ClassCastException e = assertThrows(ClassCastException.class, () -> writeToParquet(STRUCT, genericRecord, tempFile));
+        assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
+    }
+
+    public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+        try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
+                .schema(schema)
+                .createWriterFunc(DataWriter::create)
+                .overwrite()
+                .build()) {
+            appender.add(record);
+        }
+    }
+
+    public ArrayList<GenericRecord> readFromAvro(Schema schema, InputFile inputFile) throws IOException {
+        try (AvroIterable<GenericRecord> reader = Avro.read(inputFile)
+                .project(schema)
+                .createReaderFunc(DataReader::create)
+                .build()) {
+            return Lists.newArrayList(reader);
+        }
+    }
+
+    public void writeToOrc(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+        try (FileAppender<GenericRecord> appender = ORC.write(outputFile)
+                .schema(schema)
+                .createWriterFunc(GenericOrcWriter::buildWriter)
+                .overwrite()
+                .build()) {
+            appender.add(record);
+        }
+    }
+
+    public ArrayList<GenericRecord> readFromOrc(Schema schema, InputFile inputFile) throws IOException {
+        try (CloseableIterable<GenericRecord> reader = ORC.read(inputFile)
+                .project(schema)
+                .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
+                .build()) {
+            return Lists.newArrayList(reader);
+        }
+    }
+
+    public void writeToParquet(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+        try (FileAppender<GenericRecord> appender = Parquet.write(outputFile)
+                .schema(schema)
+                .createWriterFunc(GenericParquetWriter::buildWriter)
+                .overwrite()
+                .build()) {
+            appender.add(record);
+        }
+    }
+
+    public ArrayList<GenericRecord> readFromParquet(Schema schema, InputFile inputFile) throws IOException {
+        try (CloseableIterable<GenericRecord> reader = Parquet.read(inputFile)
+                .project(schema)
+                .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
+                .build()) {
+            return Lists.newArrayList(reader);
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
new file mode 100644
index 0000000000..9b75ee9ef6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    private static final Namespace NAMESPACE = Namespace.of("default");
+
+    private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "date");
+
+    private static final org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();
+        RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
+
+        for (RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2015-02-02 15:30:30.800"), Date.valueOf("2015-02-02"));
+        readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2015-02-02 15:30:30.800"), Date.valueOf("2015-02-02"));
+        readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2016-01-02 15:30:30.800"), Date.valueOf("2016-01-02"));
+        readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2017-01-10 15:30:30.800"), Date.valueOf("2017-01-10"));
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
+    }
+
+    private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException {
+        TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
+        Catalog catalog = catalogService.getCatalog();
+
+        Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+
+        catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, tableProperties);
+
+        runner.addControllerService("catalog-service", catalogService);
+        runner.enableControllerService(catalogService);
+
+        runner.setProperty(PutIceberg.CATALOG, "catalog-service");
+
+        return catalog;
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerYearTransform(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+                .year("date")
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+        runner.setProperty(PutIceberg.TABLE_NAME, "date");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateNumberOfDataFiles(table.location(), 3);
+        validatePartitionFolders(table.location(), Arrays.asList("date_year=2015", "date_year=2016", "date_year=2017"));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerMonthTransform(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+                .month("timestampMicros")
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+        runner.setProperty(PutIceberg.TABLE_NAME, "date");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateNumberOfDataFiles(table.location(), 3);
+        validatePartitionFolders(table.location(), Arrays.asList(
+                "timestampMicros_month=2015-02", "timestampMicros_month=2016-01", "timestampMicros_month=2017-01"));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerDayTransform(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+                .day("timestampMicros")
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+        runner.setProperty(PutIceberg.TABLE_NAME, "date");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateNumberOfDataFiles(table.location(), 3);
+        validatePartitionFolders(table.location(), Arrays.asList(
+                "timestampMicros_day=2015-02-02", "timestampMicros_day=2016-01-02", "timestampMicros_day=2017-01-10"));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
new file mode 100644
index 0000000000..1f7a79258b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
+import org.apache.nifi.processors.iceberg.metastore.ThriftMetastore;
+import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestPutIcebergWithHiveCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    @RegisterExtension
+    public ThriftMetastore metastore = new ThriftMetastore();
+
+    private static final Namespace NAMESPACE = Namespace.of("test_metastore");
+
+    private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "users");
+
+    private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "name", Types.StringType.get()),
+            Types.NestedField.required(3, "department", Types.StringType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/user.avsc")), StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();
+        RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
+
+        for (RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(0, "John", "Finance");
+        readerFactory.addRecord(1, "Jill", "Finance");
+        readerFactory.addRecord(2, "James", "Marketing");
+        readerFactory.addRecord(3, "Joana", "Sales");
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
+    }
+
+    private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException {
+        TestHiveCatalogService catalogService = new TestHiveCatalogService(metastore.getThriftConnectionUri(), metastore.getWarehouseLocation());
+        Catalog catalog = catalogService.getCatalog();
+
+        Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+
+        catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties);
+
+        runner.addControllerService("catalog-service", catalogService);
+        runner.enableControllerService(catalogService);
+
+        runner.setProperty(PutIceberg.CATALOG, "catalog-service");
+
+        return catalog;
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerPartitioned(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+                .bucket("department", 3)
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+                .add(0, "John", "Finance")
+                .add(1, "Jill", "Finance")
+                .add(2, "James", "Marketing")
+                .add(3, "Joana", "Sales")
+                .build();
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        String tableLocation = new URI(table.location()).getPath();
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateData(table, expectedRecords, 0);
+        validateNumberOfDataFiles(tableLocation, 3);
+        validatePartitionFolders(tableLocation, Arrays.asList(
+                "department_bucket=0", "department_bucket=1", "department_bucket=2"));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerIdentityPartitioned(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+                .identity("department")
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+                .add(0, "John", "Finance")
+                .add(1, "Jill", "Finance")
+                .add(2, "James", "Marketing")
+                .add(3, "Joana", "Sales")
+                .build();
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        String tableLocation = new URI(table.location()).getPath();
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateData(table, expectedRecords, 0);
+        validateNumberOfDataFiles(tableLocation, 3);
+        validatePartitionFolders(tableLocation, Arrays.asList(
+                "department=Finance", "department=Marketing", "department=Sales"));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception {
+        PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+                .identity("name")
+                .identity("department")
+                .build();
+
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+                .add(0, "John", "Finance")
+                .add(1, "Jill", "Finance")
+                .add(2, "James", "Marketing")
+                .add(3, "Joana", "Sales")
+                .build();
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        String tableLocation = new URI(table.location()).getPath();
+        Assertions.assertTrue(table.spec().isPartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateData(table, expectedRecords, 0);
+        validateNumberOfDataFiles(tableLocation, 4);
+        validatePartitionFolders(tableLocation, Arrays.asList(
+                "name=James/department=Marketing/",
+                "name=Jill/department=Finance/",
+                "name=Joana/department=Sales/",
+                "name=John/department=Finance/"
+        ));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "orc", "parquet"})
+    public void onTriggerUnPartitioned(String fileFormat) throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        initRecordReader();
+        Catalog catalog = initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+        List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+                .add(0, "John", "Finance")
+                .add(1, "Jill", "Finance")
+                .add(2, "James", "Marketing")
+                .add(3, "Joana", "Sales")
+                .build();
+
+        runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+        Assertions.assertTrue(table.spec().isUnpartitioned());
+        Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        validateData(table, expectedRecords, 0);
+        validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
new file mode 100644
index 0000000000..d080067216
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.File;
+import java.io.IOException;
+
+import static java.nio.file.Files.createTempDirectory;
+
+public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+    private final Catalog catalog;
+
+    public TestHadoopCatalogService() throws IOException {
+        File warehouseLocation = createTempDirectory("metastore").toFile();
+
+        catalog =  new HadoopCatalog(new Configuration(), warehouseLocation.getAbsolutePath());
+    }
+
+    @Override
+    public Catalog getCatalog() {
+        return catalog;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return null;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
new file mode 100644
index 0000000000..a0a5bf441e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+    private Catalog catalog;
+
+    public TestHiveCatalogService(String metastoreUri, String warehouseLocation) {
+        initCatalog(metastoreUri, warehouseLocation);
+    }
+
+    public void initCatalog(String metastoreUri, String warehouseLocation) {
+        catalog = new HiveCatalog();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CatalogProperties.URI, metastoreUri);
+        properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+
+        catalog.initialize("hive-catalog", properties);
+    }
+
+    @Override
+    public Catalog getCatalog() {
+        return catalog;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return null;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java
new file mode 100644
index 0000000000..dc917a358c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import org.apache.derby.jdbc.EmbeddedDriver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
+import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
+import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.nio.file.Files.createTempDirectory;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.AUTO_CREATE_ALL;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECTION_DRIVER;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECT_URL_KEY;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_TXN_MANAGER;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HMS_HANDLER_FORCE_RELOAD_CONF;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.SCHEMA_VERIFICATION;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE;
+
+/** This class wraps Metastore service core functionalities. */
+class MetastoreCore {
+
+    private final String DB_NAME = "test_metastore";
+
+    private String thriftConnectionUri;
+    private Configuration hiveConf;
+    private HiveMetaStoreClient metastoreClient;
+    private File tempDir;
+    private ExecutorService thriftServer;
+    private TServer server;
+
+    public void initialize() throws IOException, TException, InvocationTargetException, NoSuchMethodException,
+            IllegalAccessException, NoSuchFieldException, SQLException {
+        thriftServer = Executors.newSingleThreadExecutor();
+        tempDir = createTempDirectory("metastore").toFile();
+        setDerbyLogPath();
+        setupDB("jdbc:derby:" + getDerbyPath() + ";create=true");
+
+        server = thriftServer();
+        thriftServer.submit(() -> server.serve());
+
+        metastoreClient = new HiveMetaStoreClient(hiveConf);
+        metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>()));
+    }
+
+    public void shutdown() {
+
+        metastoreClient.close();
+
+        if (server != null) {
+            server.stop();
+        }
+
+        thriftServer.shutdown();
+
+        if (tempDir != null) {
+            tempDir.delete();
+        }
+    }
+
+    private HiveConf hiveConf(int port) {
+        thriftConnectionUri = "thrift://localhost:" + port;
+
+        final HiveConf hiveConf = new HiveConf(new Configuration(), this.getClass());
+        hiveConf.set(THRIFT_URIS.getVarname(), thriftConnectionUri);
+        hiveConf.set(WAREHOUSE.getVarname(), "file:" + tempDir.getAbsolutePath());
+        hiveConf.set(WAREHOUSE.getHiveName(), "file:" + tempDir.getAbsolutePath());
+        hiveConf.set(CONNECTION_DRIVER.getVarname(), EmbeddedDriver.class.getName());
+        hiveConf.set(CONNECT_URL_KEY.getVarname(), "jdbc:derby:" + getDerbyPath() + ";create=true");
+        hiveConf.set(AUTO_CREATE_ALL.getVarname(), "false");
+        hiveConf.set(SCHEMA_VERIFICATION.getVarname(), "false");
+        hiveConf.set(HIVE_TXN_MANAGER.getVarname(), "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+        hiveConf.set(COMPACTOR_INITIATOR_ON.getVarname(), "true");
+        hiveConf.set(COMPACTOR_WORKER_THREADS.getVarname(), "1");
+        hiveConf.set(HIVE_SUPPORT_CONCURRENCY.getVarname(), "true");
+        hiveConf.setBoolean("hcatalog.hive.client.cache.disabled", true);
+
+
+        hiveConf.set(CONNECTION_POOLING_TYPE.getVarname(), "NONE");
+        hiveConf.set(HMS_HANDLER_FORCE_RELOAD_CONF.getVarname(), "true");
+
+        return hiveConf;
+    }
+
+    private void setDerbyLogPath() throws IOException {
+        final String derbyLog = Files.createTempFile(tempDir.toPath(), "derby", ".log").toString();
+        System.setProperty("derby.stream.error.file", derbyLog);
+    }
+
+    private String getDerbyPath() {
+        return new File(tempDir, "metastore_db").getPath();
+    }
+
+    private TServer thriftServer() throws TTransportException, MetaException, InvocationTargetException,
+            NoSuchMethodException, IllegalAccessException, NoSuchFieldException {
+        final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new TServerSocket(0));
+        hiveConf = hiveConf(socket.getServerSocket().getLocalPort());
+        final HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", hiveConf);
+        final IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler, true);
+        final TTransportFactory transportFactory = new TTransportFactory();
+        final TSetIpAddressProcessor<IHMSHandler> processor = new TSetIpAddressProcessor<>(handler);
+
+        TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
+                .processor(processor)
+                .transportFactory(transportFactory)
+                .protocolFactory(new TBinaryProtocol.Factory())
+                .minWorkerThreads(3)
+                .maxWorkerThreads(5);
+
+        return new TThreadPoolServer(args);
+    }
+
+    private void setupDB(String dbURL) throws SQLException, IOException {
+        final Connection connection = DriverManager.getConnection(dbURL);
+        ScriptRunner scriptRunner = new ScriptRunner(connection);
+
+        final URL initScript = getClass().getClassLoader().getResource("hive-schema-3.2.0.derby.sql");
+        final Reader reader = new BufferedReader(new FileReader(initScript.getFile()));
+        scriptRunner.runScript(reader);
+    }
+
+    private String getDBPath() {
+        return Paths.get(tempDir.getAbsolutePath(), DB_NAME + ".db").toAbsolutePath().toString();
+    }
+
+    public String getThriftConnectionUri() {
+        return thriftConnectionUri;
+    }
+
+    public String getWarehouseLocation() {
+        return tempDir.getAbsolutePath();
+    }
+
+}
+
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java
new file mode 100644
index 0000000000..d1e7e1808e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** This class is responsible for metastore init script processing and execution. */
+public class ScriptRunner {
+
+    private static final String DEFAULT_DELIMITER = ";";
+
+    private final Connection connection;
+
+    public ScriptRunner(Connection connection) throws SQLException {
+        this.connection = connection;
+        this.connection.setAutoCommit(true);
+    }
+
+    public void runScript(Reader reader) throws IOException, SQLException {
+        try {
+            StringBuilder command = new StringBuilder();
+            LineNumberReader lineReader = new LineNumberReader(reader);
+            String line;
+            while ((line = lineReader.readLine()) != null) {
+                String trimmedLine = line.trim();
+                if (trimmedLine.isEmpty() || trimmedLine.startsWith("--") || trimmedLine.startsWith("//")) {
+                    continue; //Skip comment line
+                } else if (trimmedLine.endsWith(getDelimiter())) {
+                    command.append(line, 0, line.lastIndexOf(getDelimiter()));
+                    command.append(" ");
+                    Statement statement = connection.createStatement();
+
+                    statement.execute(command.toString());
+                    connection.commit();
+
+                    command = new StringBuilder();
+
+                    statement.close();
+                } else {
+                    command.append(line);
+                    command.append(" ");
+                }
+            }
+        } catch (IOException | SQLException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException("Error running metastore init script.", e);
+        } finally {
+            connection.rollback();
+        }
+    }
+
+    private String getDelimiter() {
+        return DEFAULT_DELIMITER;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java
new file mode 100644
index 0000000000..910c2df176
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */
+public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
+
+    private final MetastoreCore metastoreCore;
+
+    public ThriftMetastore() {
+        metastoreCore = new MetastoreCore();
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        metastoreCore.initialize();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        metastoreCore.shutdown();
+    }
+
+    public String getThriftConnectionUri() {
+        return metastoreCore.getThriftConnectionUri();
+    }
+
+    public String getWarehouseLocation() {
+        return metastoreCore.getWarehouseLocation();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java
new file mode 100644
index 0000000000..a70e368033
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.util;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IcebergTestUtils {
+
+    /**
+     * Validates whether the table contains the expected records. The results should be sorted by a unique key, so we do not end up with flaky tests.
+     *
+     * @param table    The table we should read the records from
+     * @param expected The expected list of Records
+     * @param sortBy   The column position by which we will sort
+     * @throws IOException Exceptions when reading the table data
+     */
+    public static void validateData(Table table, List<Record> expected, int sortBy) throws IOException {
+        List<Record> records = Lists.newArrayListWithExpectedSize(expected.size());
+        try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+            iterable.forEach(records::add);
+        }
+
+        validateData(expected, records, sortBy);
+    }
+
+    /**
+     * Validates whether the 2 sets of records are the same. The results should be sorted by a unique key, so we do not end up with flaky tests.
+     *
+     * @param expected The expected list of Records
+     * @param actual   The actual list of Records
+     * @param sortBy   The column position by which we will sort
+     */
+    public static void validateData(List<Record> expected, List<Record> actual, int sortBy) {
+        List<Record> sortedExpected = Lists.newArrayList(expected);
+        List<Record> sortedActual = Lists.newArrayList(actual);
+        // Sort based on the specified field
+        sortedExpected.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+        sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+
+        assertEquals(sortedExpected.size(), sortedActual.size());
+        for (int i = 0; i < expected.size(); ++i) {
+            assertEquals(sortedExpected.get(i), sortedActual.get(i));
+        }
+    }
+
+    /**
+     * Validates the number of files under a {@link Table}
+     *
+     * @param tableLocation     The location of table we are checking
+     * @param numberOfDataFiles The expected number of data files (TABLE_LOCATION/data/*)
+     */
+    public static void validateNumberOfDataFiles(String tableLocation, int numberOfDataFiles) throws IOException {
+        List<Path> dataFiles = Files.walk(Paths.get(tableLocation + "/data"))
+                .filter(Files::isRegularFile)
+                .filter(path -> !path.getFileName().toString().startsWith("."))
+                .collect(Collectors.toList());
+
+        assertEquals(numberOfDataFiles, dataFiles.size());
+    }
+
+    public static void validatePartitionFolders(String tableLocation, List<String> partitionPaths) {
+        for (String partitionPath : partitionPaths) {
+            Path path = Paths.get(tableLocation + "/data/" + partitionPath);
+            assertTrue("The expected path doesn't exists: " + path, Files.exists(path));
+        }
+    }
+
+    public static class RecordsBuilder {
+
+        private final List<Record> records = new ArrayList<>();
+        private final Schema schema;
+
+        private RecordsBuilder(Schema schema) {
+            this.schema = schema;
+        }
+
+        public RecordsBuilder add(Object... values) {
+            Validate.isTrue(schema.columns().size() == values.length, "Number of provided values and schema length should be equal.");
+
+            GenericRecord record = GenericRecord.create(schema);
+
+            for (int i = 0; i < values.length; i++) {
+                record.set(i, values[i]);
+            }
+
+            records.add(record);
+            return this;
+        }
+
+        public List<Record> build() {
+            return Collections.unmodifiableList(records);
+        }
+
+        public static RecordsBuilder newInstance(Schema schema) {
+            return new RecordsBuilder(schema);
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc
new file mode 100644
index 0000000000..6fc6010b78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+    	"name" : "timeMicros",
+    	"type": {
+    		"type": "long",
+    		"logicalType": "time-micros"
+        }
+    },
+    {
+    	"name" : "timestampMicros",
+    	 "type": {
+    		"type" : "long",
+    		"logicalType" : "timestamp-micros"
+        }
+	},
+	{
+        "name" : "date",
+        "type": {
+            "type" : "int",
+            "logicalType" : "date"
+        }
+    }
+  ]
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql
new file mode 100644
index 0000000000..cbc7c935ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql
@@ -0,0 +1,738 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- Timestamp: 2011-09-22 15:32:02.024
+-- Source database is: /home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Connection URL is: jdbc:derby:/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Specified schema is: APP
+-- appendLogs: false
+
+-- ----------------------------------------------
+-- DDL Statements for functions
+-- ----------------------------------------------
+
+CREATE FUNCTION "APP"."NUCLEUS_ASCII" (C CHAR(1)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.ascii' ;
+
+CREATE FUNCTION "APP"."NUCLEUS_MATCHES" (TEXT VARCHAR(8000),PATTERN VARCHAR(8000)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.matches' ;
+
+-- ----------------------------------------------
+-- DDL Statements for tables
+-- ----------------------------------------------
+CREATE TABLE "APP"."DBS" (
+  "DB_ID" BIGINT NOT NULL,
+  "DESC" VARCHAR(4000),
+  "DB_LOCATION_URI" VARCHAR(4000) NOT NULL,
+  "NAME" VARCHAR(128),
+  "OWNER_NAME" VARCHAR(128),
+  "OWNER_TYPE" VARCHAR(10),
+  "CTLG_NAME" VARCHAR(256) NOT NULL DEFAULT 'hive',
+  "CREATE_TIME" INTEGER
+);
+
+CREATE TABLE "APP"."TBL_PRIVS" ("TBL_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."TBL_COL_PRIVS" ("TBL_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_COL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."CDS" ("CD_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."PARTITION_KEY_VALS" ("PART_ID" BIGINT NOT NULL, "PART_KEY_VAL" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."DB_PRIVS" ("DB_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "DB_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."IDXS" ("INDEX_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DEFERRED_REBUILD" CHAR(1) NOT NULL, "INDEX_HANDLER_CLASS" VARCHAR(4000), "INDEX_NAME" VARCHAR(128), "INDEX_TBL_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "ORIG_TBL_ID" BIGINT, "SD_ID" BIGINT);
+
+CREATE TABLE "APP"."INDEX_PARAMS" ("INDEX_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."PARTITIONS" ("PART_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "LAST_ACCESS_TIME" INTEGER NOT NULL, "PART_NAME" VARCHAR(767), "SD_ID" BIGINT, "TBL_ID" BIGINT);
+
+CREATE TABLE "APP"."SERDES" ("SERDE_ID" BIGINT NOT NULL, "NAME" VARCHAR(128), "SLIB" VARCHAR(4000), "DESCRIPTION" VARCHAR(4000), "SERIALIZER_CLASS" VARCHAR(4000), "DESERIALIZER_CLASS" VARCHAR(4000), SERDE_TYPE INTEGER);
+
+CREATE TABLE "APP"."PART_PRIVS" ("PART_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PART_ID" BIGINT, "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "PART_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."ROLE_MAP" ("ROLE_GRANT_ID" BIGINT NOT NULL, "ADD_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "ROLE_ID" BIGINT);
+
+CREATE TABLE "APP"."TYPES" ("TYPES_ID" BIGINT NOT NULL, "TYPE_NAME" VARCHAR(128), "TYPE1" VARCHAR(767), "TYPE2" VARCHAR(767));
+
+CREATE TABLE "APP"."GLOBAL_PRIVS" ("USER_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "USER_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."PARTITION_PARAMS" ("PART_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."PARTITION_EVENTS" (
+    "PART_NAME_ID" BIGINT NOT NULL,
+    "CAT_NAME" VARCHAR(256),
+    "DB_NAME" VARCHAR(128),
+    "EVENT_TIME" BIGINT NOT NULL,
+    "EVENT_TYPE" INTEGER NOT NULL,
+    "PARTITION_NAME" VARCHAR(767),
+    "TBL_NAME" VARCHAR(256)
+);
+
+CREATE TABLE "APP"."COLUMNS" ("SD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(256), "COLUMN_NAME" VARCHAR(128) NOT NULL, "TYPE_NAME" VARCHAR(4000) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."ROLES" ("ROLE_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "OWNER_NAME" VARCHAR(128), "ROLE_NAME" VARCHAR(128));
+
+CREATE TABLE "APP"."TBLS" ("TBL_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "OWNER" VARCHAR(767), "OWNER_TYPE" VARCHAR(10), "RETENTION" INTEGER NOT NULL, "SD_ID" BIGINT, "TBL_NAME" VARCHAR(256), "TBL_TYPE" VARCHAR(128), "VIEW_EXPANDED_TEXT" LONG VARCHAR, "VIEW_ORIGINAL_TEXT" LONG VARCHAR, "IS_REWRITE_ENABLED" CHAR(1) NOT NULL DEFAULT 'N');
+
+CREATE TABLE "APP"."PARTITION_KEYS" ("TBL_ID" BIGINT NOT NULL, "PKEY_COMMENT" VARCHAR(4000), "PKEY_NAME" VARCHAR(128) NOT NULL, "PKEY_TYPE" VARCHAR(767) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."PART_COL_PRIVS" ("PART_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PART_ID" BIGINT, "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "PART_COL_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, "INPUT_FORMAT" VARCHAR(4000), "IS_COMPRESSED" CHAR(1) NOT NULL, "LOCATION" VARCHAR(4000), "NUM_BUCKETS" INTEGER NOT NULL, "OUTPUT_FORMAT" VARCHAR(4000), "SERDE_ID" BIGINT, "CD_ID" BIGINT, "IS_STOREDASSUBDIRECTORIES" CHAR(1) NOT NULL);
+
+CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL, "NEXT_VAL" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."TAB_COL_STATS"(
+    "CAT_NAME" VARCHAR(256) NOT NULL,
+    "DB_NAME" VARCHAR(128) NOT NULL,
+    "TABLE_NAME" VARCHAR(256) NOT NULL,
+    "COLUMN_NAME" VARCHAR(767) NOT NULL,
+    "COLUMN_TYPE" VARCHAR(128) NOT NULL,
+    "LONG_LOW_VALUE" BIGINT,
+    "LONG_HIGH_VALUE" BIGINT,
+    "DOUBLE_LOW_VALUE" DOUBLE,
+    "DOUBLE_HIGH_VALUE" DOUBLE,
+    "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000),
+    "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000),
+    "NUM_DISTINCTS" BIGINT,
+    "NUM_NULLS" BIGINT NOT NULL,
+    "AVG_COL_LEN" DOUBLE,
+    "MAX_COL_LEN" BIGINT,
+    "NUM_TRUES" BIGINT,
+    "NUM_FALSES" BIGINT,
+    "LAST_ANALYZED" BIGINT,
+    "CS_ID" BIGINT NOT NULL,
+    "TBL_ID" BIGINT NOT NULL,
+    "BIT_VECTOR" BLOB
+);
+
+CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."TYPE_FIELDS" ("TYPE_NAME" BIGINT NOT NULL, "COMMENT" VARCHAR(256), "FIELD_NAME" VARCHAR(128) NOT NULL, "FIELD_TYPE" VARCHAR(767) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."NUCLEUS_TABLES" ("CLASS_NAME" VARCHAR(128) NOT NULL, "TABLE_NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(4) NOT NULL, "OWNER" VARCHAR(2) NOT NULL, "VERSION" VARCHAR(20) NOT NULL, "INTERFACE_NAME" VARCHAR(256) DEFAULT NULL);
+
+CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST_VALUES" ("STRING_LIST_ID" BIGINT NOT NULL, "STRING_LIST_VALUE" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_NAMES" ("SD_ID" BIGINT NOT NULL, "SKEWED_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ("SD_ID" BIGINT NOT NULL, "STRING_LIST_ID_KID" BIGINT NOT NULL, "LOCATION" VARCHAR(4000));
+
+CREATE TABLE "APP"."SKEWED_VALUES" ("SD_ID_OID" BIGINT NOT NULL, "STRING_LIST_ID_EID" BIGINT NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."MASTER_KEYS" ("KEY_ID" INTEGER NOT NULL generated always as identity (start with 1), "MASTER_KEY" VARCHAR(767));
+
+CREATE TABLE "APP"."DELEGATION_TOKENS" ( "TOKEN_IDENT" VARCHAR(767) NOT NULL, "TOKEN" VARCHAR(767));
+
+CREATE TABLE "APP"."PART_COL_STATS"(
+    "CAT_NAME" VARCHAR(256) NOT NULL,
+    "DB_NAME" VARCHAR(128) NOT NULL,
+    "TABLE_NAME" VARCHAR(256) NOT NULL,
+    "PARTITION_NAME" VARCHAR(767) NOT NULL,
+    "COLUMN_NAME" VARCHAR(767) NOT NULL,
+    "COLUMN_TYPE" VARCHAR(128) NOT NULL,
+    "LONG_LOW_VALUE" BIGINT,
+    "LONG_HIGH_VALUE" BIGINT,
+    "DOUBLE_LOW_VALUE" DOUBLE,
+    "DOUBLE_HIGH_VALUE" DOUBLE,
+    "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000),
+    "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000),
+    "NUM_DISTINCTS" BIGINT,
+    "BIT_VECTOR" BLOB,
+    "NUM_NULLS" BIGINT NOT NULL,
+    "AVG_COL_LEN" DOUBLE,
+    "MAX_COL_LEN" BIGINT,
+    "NUM_TRUES" BIGINT,
+    "NUM_FALSES" BIGINT,
+    "LAST_ANALYZED" BIGINT,
+    "CS_ID" BIGINT NOT NULL,
+    "PART_ID" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."VERSION" ("VER_ID" BIGINT NOT NULL, "SCHEMA_VERSION" VARCHAR(127) NOT NULL, "VERSION_COMMENT" VARCHAR(255));
+
+CREATE TABLE "APP"."FUNCS" ("FUNC_ID" BIGINT NOT NULL, "CLASS_NAME" VARCHAR(4000), "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "FUNC_NAME" VARCHAR(128), "FUNC_TYPE" INTEGER NOT NULL, "OWNER_NAME" VARCHAR(128), "OWNER_TYPE" VARCHAR(10));
+
+CREATE TABLE "APP"."FUNC_RU" ("FUNC_ID" BIGINT NOT NULL, "RESOURCE_TYPE" INTEGER NOT NULL, "RESOURCE_URI" VARCHAR(4000), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."NOTIFICATION_LOG" (
+    "NL_ID" BIGINT NOT NULL,
+    "CAT_NAME" VARCHAR(256),
+    "DB_NAME" VARCHAR(128),
+    "EVENT_ID" BIGINT NOT NULL,
+    "EVENT_TIME" INTEGER NOT NULL,
+    "EVENT_TYPE" VARCHAR(32) NOT NULL,
+    "MESSAGE" CLOB,
+    "TBL_NAME" VARCHAR(256),
+    "MESSAGE_FORMAT" VARCHAR(16)
+);
+
+CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, "CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT , "PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL,  "POSITION" BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE" SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT, "ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400));
+
+CREATE TABLE "APP"."METASTORE_DB_PROPERTIES" ("PROPERTY_KEY" VARCHAR(255) NOT NULL, "PROPERTY_VALUE" VARCHAR(1000) NOT NULL, "DESCRIPTION" VARCHAR(1000));
+
+CREATE TABLE "APP"."WM_RESOURCEPLAN" (RP_ID BIGINT NOT NULL, NAME VARCHAR(128) NOT NULL, QUERY_PARALLELISM INTEGER, STATUS VARCHAR(20) NOT NULL, DEFAULT_POOL_ID BIGINT);
+
+CREATE TABLE "APP"."WM_POOL" (POOL_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, PATH VARCHAR(1024) NOT NULL, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER, SCHEDULING_POLICY VARCHAR(1024));
+
+CREATE TABLE "APP"."WM_TRIGGER" (TRIGGER_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, NAME VARCHAR(128) NOT NULL, TRIGGER_EXPRESSION VARCHAR(1024), ACTION_EXPRESSION VARCHAR(1024), IS_IN_UNMANAGED INTEGER NOT NULL DEFAULT 0);
+
+CREATE TABLE "APP"."WM_POOL_TO_TRIGGER"  (POOL_ID BIGINT NOT NULL, TRIGGER_ID BIGINT NOT NULL);
+
+CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(128) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
+
+CREATE TABLE "APP"."MV_CREATION_METADATA" (
+  "MV_CREATION_METADATA_ID" BIGINT NOT NULL,
+  "CAT_NAME" VARCHAR(256) NOT NULL,
+  "DB_NAME" VARCHAR(128) NOT NULL,
+  "TBL_NAME" VARCHAR(256) NOT NULL,
+  "TXN_LIST" CLOB,
+  "MATERIALIZATION_TIME" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."MV_TABLES_USED" (
+  "MV_CREATION_METADATA_ID" BIGINT NOT NULL,
+  "TBL_ID" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."CTLGS" (
+    "CTLG_ID" BIGINT NOT NULL,
+    "NAME" VARCHAR(256) UNIQUE,
+    "DESC" VARCHAR(4000),
+    "LOCATION_URI" VARCHAR(4000) NOT NULL,
+    "CREATE_TIME" INTEGER);
+
+-- Insert a default value.  The location is TBD.  Hive will fix this when it starts
+INSERT INTO "APP"."CTLGS" VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
+
+-- ----------------------------------------------
+-- DML Statements
+-- ----------------------------------------------
+
+INSERT INTO "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "APP"."NOTIFICATION_SEQUENCE");
+
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE" WHERE "SEQUENCE_NAME" = 'org.apache.hadoop.hive.metastore.model.MNotificationLog');
+
+-- ----------------------------------------------
+-- DDL Statements for indexes
+-- ----------------------------------------------
+
+CREATE UNIQUE INDEX "APP"."UNIQUEINDEX" ON "APP"."IDXS" ("INDEX_NAME", "ORIG_TBL_ID");
+
+CREATE INDEX "APP"."TABLECOLUMNPRIVILEGEINDEX" ON "APP"."TBL_COL_PRIVS" ("AUTHORIZER", "TBL_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_COL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."DBPRIVILEGEINDEX" ON "APP"."DB_PRIVS" ("AUTHORIZER", "DB_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "DB_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE INDEX "APP"."PCS_STATS_IDX" ON "APP"."PART_COL_STATS" ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME","PARTITION_NAME");
+
+CREATE INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME");
+
+CREATE INDEX "APP"."PARTPRIVILEGEINDEX" ON "APP"."PART_PRIVS" ("AUTHORIZER", "PART_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."ROLEENTITYINDEX" ON "APP"."ROLES" ("ROLE_NAME");
+
+CREATE INDEX "APP"."TABLEPRIVILEGEINDEX" ON "APP"."TBL_PRIVS" ("AUTHORIZER", "TBL_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUETABLE" ON "APP"."TBLS" ("TBL_NAME", "DB_ID");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_DATABASE" ON "APP"."DBS" ("NAME", "CTLG_NAME");
+
+CREATE UNIQUE INDEX "APP"."USERROLEMAPINDEX" ON "APP"."ROLE_MAP" ("PRINCIPAL_NAME", "ROLE_ID", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."GLOBALPRIVILEGEINDEX" ON "APP"."GLOBAL_PRIVS" ("AUTHORIZER", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "USER_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_TYPE" ON "APP"."TYPES" ("TYPE_NAME");
+
+CREATE INDEX "APP"."PARTITIONCOLUMNPRIVILEGEINDEX" ON "APP"."PART_COL_PRIVS" ("AUTHORIZER", "PART_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_COL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUEPARTITION" ON "APP"."PARTITIONS" ("PART_NAME", "TBL_ID");
+
+CREATE UNIQUE INDEX "APP"."UNIQUEFUNCTION" ON "APP"."FUNCS" ("FUNC_NAME", "DB_ID");
+
+CREATE INDEX "APP"."FUNCS_N49" ON "APP"."FUNCS" ("DB_ID");
+
+CREATE INDEX "APP"."FUNC_RU_N49" ON "APP"."FUNC_RU" ("FUNC_ID");
+
+CREATE INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON "APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID");
+
+CREATE INDEX "APP"."CONSTRAINTS_CONSTRAINT_TYPE_INDEX" ON "APP"."KEY_CONSTRAINTS"("CONSTRAINT_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_POOL" ON "APP"."WM_POOL" ("RP_ID", "PATH");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_TRIGGER" ON "APP"."WM_TRIGGER" ("RP_ID", "NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_MAPPING" ON "APP"."WM_MAPPING" ("RP_ID", "ENTITY_TYPE", "ENTITY_NAME");
+
+CREATE UNIQUE INDEX "APP"."MV_UNIQUE_TABLE" ON "APP"."MV_CREATION_METADATA" ("TBL_NAME", "DB_NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_CATALOG" ON "APP"."CTLGS" ("NAME");
+
+
+-- ----------------------------------------------
+-- DDL Statements for keys
+-- ----------------------------------------------
+
+-- primary/unique
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_PK" PRIMARY KEY ("INDEX_ID");
+
+ALTER TABLE "APP"."TBL_COL_PRIVS" ADD CONSTRAINT "TBL_COL_PRIVS_PK" PRIMARY KEY ("TBL_COLUMN_GRANT_ID");
+
+ALTER TABLE "APP"."CDS" ADD CONSTRAINT "SQL110922153006460" PRIMARY KEY ("CD_ID");
+
+ALTER TABLE "APP"."DB_PRIVS" ADD CONSTRAINT "DB_PRIVS_PK" PRIMARY KEY ("DB_GRANT_ID");
+
+ALTER TABLE "APP"."INDEX_PARAMS" ADD CONSTRAINT "INDEX_PARAMS_PK" PRIMARY KEY ("INDEX_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEY_PK" PRIMARY KEY ("TBL_ID", "PKEY_NAME");
+
+ALTER TABLE "APP"."SEQUENCE_TABLE" ADD CONSTRAINT "SEQUENCE_TABLE_PK" PRIMARY KEY ("SEQUENCE_NAME");
+
+ALTER TABLE "APP"."PART_PRIVS" ADD CONSTRAINT "PART_PRIVS_PK" PRIMARY KEY ("PART_GRANT_ID");
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_PK" PRIMARY KEY ("SD_ID");
+
+ALTER TABLE "APP"."SERDES" ADD CONSTRAINT "SERDES_PK" PRIMARY KEY ("SERDE_ID");
+
+ALTER TABLE "APP"."COLUMNS" ADD CONSTRAINT "COLUMNS_PK" PRIMARY KEY ("SD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."PARTITION_EVENTS" ADD CONSTRAINT "PARTITION_EVENTS_PK" PRIMARY KEY ("PART_NAME_ID");
+
+ALTER TABLE "APP"."TYPE_FIELDS" ADD CONSTRAINT "TYPE_FIELDS_PK" PRIMARY KEY ("TYPE_NAME", "FIELD_NAME");
+
+ALTER TABLE "APP"."ROLES" ADD CONSTRAINT "ROLES_PK" PRIMARY KEY ("ROLE_ID");
+
+ALTER TABLE "APP"."TBL_PRIVS" ADD CONSTRAINT "TBL_PRIVS_PK" PRIMARY KEY ("TBL_GRANT_ID");
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_PK" PRIMARY KEY ("SERDE_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."NUCLEUS_TABLES" ADD CONSTRAINT "NUCLEUS_TABLES_PK" PRIMARY KEY ("CLASS_NAME");
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_PK" PRIMARY KEY ("TBL_ID");
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_PK" PRIMARY KEY ("SD_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_PK" PRIMARY KEY ("DB_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_PK" PRIMARY KEY ("DB_ID");
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_PK" PRIMARY KEY ("ROLE_GRANT_ID");
+
+ALTER TABLE "APP"."GLOBAL_PRIVS" ADD CONSTRAINT "GLOBAL_PRIVS_PK" PRIMARY KEY ("USER_GRANT_ID");
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."PARTITION_KEY_VALS" ADD CONSTRAINT "PARTITION_KEY_VALS_PK" PRIMARY KEY ("PART_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."TYPES" ADD CONSTRAINT "TYPES_PK" PRIMARY KEY ("TYPES_ID");
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "SQL110922153006740" PRIMARY KEY ("CD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."PART_COL_PRIVS" ADD CONSTRAINT "PART_COL_PRIVS_PK" PRIMARY KEY ("PART_COLUMN_GRANT_ID");
+
+ALTER TABLE "APP"."PARTITION_PARAMS" ADD CONSTRAINT "PARTITION_PARAMS_PK" PRIMARY KEY ("PART_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_PK" PRIMARY KEY ("PART_ID");
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_PK" PRIMARY KEY ("TBL_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST" ADD CONSTRAINT "SKEWED_STRING_LIST_PK" PRIMARY KEY ("STRING_LIST_ID");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_PK" PRIMARY KEY ("STRING_LIST_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_PK" PRIMARY KEY ("SD_ID", "STRING_LIST_ID_KID");
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_PK" PRIMARY KEY ("SD_ID_OID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."TAB_COL_STATS" ADD CONSTRAINT "TAB_COL_STATS_PK" PRIMARY KEY ("CS_ID");
+
+ALTER TABLE "APP"."PART_COL_STATS" ADD CONSTRAINT "PART_COL_STATS_PK" PRIMARY KEY ("CS_ID");
+
+ALTER TABLE "APP"."FUNCS" ADD CONSTRAINT "FUNCS_PK" PRIMARY KEY ("FUNC_ID");
+
+ALTER TABLE "APP"."FUNC_RU" ADD CONSTRAINT "FUNC_RU_PK" PRIMARY KEY ("FUNC_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."NOTIFICATION_LOG" ADD CONSTRAINT "NOTIFICATION_LOG_PK" PRIMARY KEY ("NL_ID");
+
+ALTER TABLE "APP"."NOTIFICATION_SEQUENCE" ADD CONSTRAINT "NOTIFICATION_SEQUENCE_PK" PRIMARY KEY ("NNI_ID");
+
+ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("CONSTRAINT_NAME", "POSITION");
+
+ALTER TABLE "APP"."METASTORE_DB_PROPERTIES" ADD CONSTRAINT "PROPERTY_KEY_PK" PRIMARY KEY ("PROPERTY_KEY");
+
+ALTER TABLE "APP"."MV_CREATION_METADATA" ADD CONSTRAINT "MV_CREATION_METADATA_PK" PRIMARY KEY ("MV_CREATION_METADATA_ID");
+
+ALTER TABLE "APP"."CTLGS" ADD CONSTRAINT "CTLG_PK" PRIMARY KEY ("CTLG_ID");
+
+
+-- foreign
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK1" FOREIGN KEY ("ORIG_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK3" FOREIGN KEY ("INDEX_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBL_COL_PRIVS" ADD CONSTRAINT "TBL_COL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DB_PRIVS" ADD CONSTRAINT "DB_PRIVS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."INDEX_PARAMS" ADD CONSTRAINT "INDEX_PARAMS_FK1" FOREIGN KEY ("INDEX_ID") REFERENCES "APP"."IDXS" ("INDEX_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEYS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_PRIVS" ADD CONSTRAINT "PART_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK2" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."COLUMNS" ADD CONSTRAINT "COLUMNS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TYPE_FIELDS" ADD CONSTRAINT "TYPE_FIELDS_FK1" FOREIGN KEY ("TYPE_NAME") REFERENCES "APP"."TYPES" ("TYPES_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBL_PRIVS" ADD CONSTRAINT "TBL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_FK1" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_FK1" FOREIGN KEY ("ROLE_ID") REFERENCES "APP"."ROLES" ("ROLE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_KEY_VALS" ADD CONSTRAINT "PARTITION_KEY_VALS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "COLUMNS_V2_FK1" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_COL_PRIVS" ADD CONSTRAINT "PART_COL_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_PARAMS" ADD CONSTRAINT "PARTITION_PARAMS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_FK1" FOREIGN KEY ("STRING_LIST_ID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK2" FOREIGN KEY ("STRING_LIST_ID_KID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK1" FOREIGN KEY ("SD_ID_OID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK2" FOREIGN KEY ("STRING_LIST_ID_EID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TAB_COL_STATS" ADD CONSTRAINT "TAB_COL_STATS_FK" FOREIGN KEY ("TBL_ID") REFERENCES TBLS("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_COL_STATS" ADD CONSTRAINT "PART_COL_STATS_FK" FOREIGN KEY ("PART_ID") REFERENCES PARTITIONS("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."VERSION" ADD CONSTRAINT "VERSION_PK" PRIMARY KEY ("VER_ID");
+
+ALTER TABLE "APP"."FUNCS" ADD CONSTRAINT "FUNCS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."FUNC_RU" ADD CONSTRAINT "FUNC_RU_FK1" FOREIGN KEY ("FUNC_ID") REFERENCES "APP"."FUNCS" ("FUNC_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_RESOURCEPLAN" ADD CONSTRAINT "WM_RESOURCEPLAN_PK" PRIMARY KEY ("RP_ID");
+
+ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_PK" PRIMARY KEY ("POOL_ID");
+
+ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_RESOURCEPLAN" ADD CONSTRAINT "WM_RESOURCEPLAN_FK1" FOREIGN KEY ("DEFAULT_POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_TRIGGER" ADD CONSTRAINT "WM_TRIGGER_PK" PRIMARY KEY ("TRIGGER_ID");
+
+ALTER TABLE "APP"."WM_TRIGGER" ADD CONSTRAINT "WM_TRIGGER_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK1" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK2" FOREIGN KEY ("TRIGGER_ID") REFERENCES "APP"."WM_TRIGGER" ("TRIGGER_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_PK" PRIMARY KEY ("MAPPING_ID");
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_FK2" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_FK1" FOREIGN KEY ("MV_CREATION_METADATA_ID") REFERENCES "APP"."MV_CREATION_METADATA" ("MV_CREATION_METADATA_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_FK2" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_CTLG_FK" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+-- ----------------------------------------------
+-- DDL Statements for checks
+-- ----------------------------------------------
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "SQL110318025504980" CHECK (DEFERRED_REBUILD IN ('Y','N'));
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED IN ('Y','N'));
+
+-- ----------------------------
+-- Transaction and Lock Tables
+-- ----------------------------
+CREATE TABLE TXNS (
+  TXN_ID bigint PRIMARY KEY,
+  TXN_STATE char(1) NOT NULL,
+  TXN_STARTED bigint NOT NULL,
+  TXN_LAST_HEARTBEAT bigint NOT NULL,
+  TXN_USER varchar(128) NOT NULL,
+  TXN_HOST varchar(128) NOT NULL,
+  TXN_AGENT_INFO varchar(128),
+  TXN_META_INFO varchar(128),
+  TXN_HEARTBEAT_COUNT integer,
+  TXN_TYPE integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+  TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID),
+  TC_DATABASE varchar(128) NOT NULL,
+  TC_TABLE varchar(128),
+  TC_PARTITION varchar(767),
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID bigint
+);
+
+CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+  CTC_TXNID bigint NOT NULL,
+  CTC_DATABASE varchar(128) NOT NULL,
+  CTC_TABLE varchar(256),
+  CTC_PARTITION varchar(767),
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID bigint,
+  CTC_UPDATE_DELETE char(1) NOT NULL
+);
+
+CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
+
+CREATE TABLE NEXT_TXN_ID (
+  NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID bigint NOT NULL,
+  HL_LOCK_INT_ID bigint NOT NULL,
+  HL_TXNID bigint NOT NULL,
+  HL_DB varchar(128) NOT NULL,
+  HL_TABLE varchar(128),
+  HL_PARTITION varchar(767),
+  HL_LOCK_STATE char(1) NOT NULL,
+  HL_LOCK_TYPE char(1) NOT NULL,
+  HL_LAST_HEARTBEAT bigint NOT NULL,
+  HL_ACQUIRED_AT bigint,
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT integer,
+  HL_AGENT_INFO varchar(128),
+  HL_BLOCKEDBY_EXT_ID bigint,
+  HL_BLOCKEDBY_INT_ID bigint,
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+  CQ_ID bigint PRIMARY KEY,
+  CQ_DATABASE varchar(128) NOT NULL,
+  CQ_TABLE varchar(128) NOT NULL,
+  CQ_PARTITION varchar(767),
+  CQ_STATE char(1) NOT NULL,
+  CQ_TYPE char(1) NOT NULL,
+  CQ_TBLPROPERTIES varchar(2048),
+  CQ_WORKER_ID varchar(128),
+  CQ_START bigint,
+  CQ_RUN_AS varchar(128),
+  CQ_HIGHEST_WRITE_ID bigint,
+  CQ_META_INFO varchar(2048) for bit data,
+  CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+  NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+  CC_ID bigint PRIMARY KEY,
+  CC_DATABASE varchar(128) NOT NULL,
+  CC_TABLE varchar(128) NOT NULL,
+  CC_PARTITION varchar(767),
+  CC_STATE char(1) NOT NULL,
+  CC_TYPE char(1) NOT NULL,
+  CC_TBLPROPERTIES varchar(2048),
+  CC_WORKER_ID varchar(128),
+  CC_START bigint,
+  CC_END bigint,
+  CC_RUN_AS varchar(128),
+  CC_HIGHEST_WRITE_ID bigint,
+  CC_META_INFO varchar(2048) for bit data,
+  CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar(128) NOT NULL,
+  MT_KEY2 bigint NOT NULL,
+  MT_COMMENT varchar(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK
+--This is a good candidate for Index orgainzed table
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TBL_TO_TXN_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+CREATE UNIQUE INDEX TBL_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_WRITEID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+CREATE TABLE MIN_HISTORY_LEVEL (
+  MHL_TXNID bigint NOT NULL,
+  MHL_MIN_OPEN_TXNID bigint NOT NULL,
+  PRIMARY KEY(MHL_TXNID)
+);
+
+CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
+
+CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
+  MRL_TXN_ID BIGINT NOT NULL,
+  MRL_DB_NAME VARCHAR(128) NOT NULL,
+  MRL_TBL_NAME VARCHAR(256) NOT NULL,
+  MRL_LAST_HEARTBEAT BIGINT NOT NULL,
+  PRIMARY KEY(MRL_TXN_ID)
+);
+
+CREATE TABLE "APP"."I_SCHEMA" (
+  "SCHEMA_ID" bigint primary key,
+  "SCHEMA_TYPE" integer not null,
+  "NAME" varchar(256) unique,
+  "DB_ID" bigint references "APP"."DBS" ("DB_ID"),
+  "COMPATIBILITY" integer not null,
+  "VALIDATION_LEVEL" integer not null,
+  "CAN_EVOLVE" char(1) not null,
+  "SCHEMA_GROUP" varchar(256),
+  "DESCRIPTION" varchar(4000)
+);
+
+CREATE TABLE "APP"."SCHEMA_VERSION" (
+  "SCHEMA_VERSION_ID" bigint primary key,
+  "SCHEMA_ID" bigint references "APP"."I_SCHEMA" ("SCHEMA_ID"),
+  "VERSION" integer not null,
+  "CREATED_AT" bigint not null,
+  "CD_ID" bigint references "APP"."CDS" ("CD_ID"),
+  "STATE" integer not null,
+  "DESCRIPTION" varchar(4000),
+  "SCHEMA_TEXT" clob,
+  "FINGERPRINT" varchar(256),
+  "SCHEMA_VERSION_NAME" varchar(256),
+  "SERDE_ID" bigint references "APP"."SERDES" ("SERDE_ID")
+);
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_SCHEMA_VERSION" ON "APP"."SCHEMA_VERSION" ("SCHEMA_ID", "VERSION");
+
+CREATE TABLE REPL_TXN_MAP (
+  RTM_REPL_POLICY varchar(256) NOT NULL,
+  RTM_SRC_TXN_ID bigint NOT NULL,
+  RTM_TARGET_TXN_ID bigint NOT NULL,
+  PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
+);
+
+CREATE TABLE "APP"."RUNTIME_STATS" (
+  "RS_ID" bigint primary key,
+  "CREATE_TIME" integer not null,
+  "WEIGHT" integer not null,
+  "PAYLOAD" BLOB
+);
+
+CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(767) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME integer NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
+-- -----------------------------------------------------------------
+-- Record schema version. Should be the last step in the init script
+-- -----------------------------------------------------------------
+INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '3.2.0', 'Hive release version 3.2.0');
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
new file mode 100644
index 0000000000..5cdac5fe8c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "nifi",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "id",  "type": ["int", "null"]},
+     {"name": "name", "type": ["string", "null"]},
+     {"name": "department", "type": ["string", "null"]}
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
new file mode 100644
index 0000000000..1c0fada816
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-services-api-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..6effaa89d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..3e58f19e5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,501 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+  (ASLv2) Apache Iceberg
+    The following NOTICE information applies:
+      Apache Iceberg
+      Copyright 2017-2022 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Apache Ant
+    The following NOTICE information applies:
+      Apache Ant
+      Copyright 1999-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Codec
+      The following NOTICE information applies:
+        Apache Commons Codec
+        Copyright 2002-2014 The Apache Software Foundation
+
+        src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+        contains test data from http://aspell.net/test/orig/batch0.tab.
+        Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+        ===============================================================================
+
+        The content of package org.apache.commons.codec.language.bm has been translated
+        from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+        with permission from the original authors.
+        Original source copyright:
+        Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons DBCP
+    The following NOTICE information applies:
+      Apache Commons DBCP
+      Copyright 2001-2015 The Apache Software Foundation.
+
+  (ASLv2) Apache HttpComponents
+      The following NOTICE information applies:
+        Apache HttpComponents Client
+        Copyright 1999-2016 The Apache Software Foundation
+        Apache HttpComponents Core - HttpCore
+        Copyright 2006-2009 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Pool
+    The following NOTICE information applies:
+      Apache Commons Pool
+      Copyright 1999-2009 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons BeanUtils
+    The following NOTICE information applies:
+      Apache Commons BeanUtils
+      Copyright 2000-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Net
+      The following NOTICE information applies:
+        Apache Commons Net
+        Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Parquet
+    The following NOTICE information applies:
+      Apache Parquet MR (Incubating)
+      Copyright 2014 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Audience Annotations
+    The following NOTICE information applies:
+      Apache Yetus
+      Copyright 2008-2018 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2017 The Apache Software Foundation
+
+      The files in the package org.apache.commons.compress.archivers.sevenz
+      were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+      which has been placed in the public domain:
+
+      "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+  (ASLv2) Apache Commons Configuration
+    The following NOTICE information applies:
+      Apache Commons Configuration
+      Copyright 2001-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Text
+    The following NOTICE information applies:
+      Apache Commons Text
+      Copyright 2001-2018 The Apache Software Foundation
+
+  (ASLv2) Apache Commons CLI
+    The following NOTICE information applies:
+        Apache Commons CLI
+        Copyright 2001-2017 The Apache Software Foundation
+
+        This product includes software developed at
+        The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Apache Commons Collections
+    The following NOTICE information applies:
+      Apache Commons Collections
+      Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Daemon
+    The following NOTICE information applies:
+      Apache Commons Daemon
+      Copyright 2002-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Ivy
+      The following NOTICE information applies:
+         Copyright 2007-2017 The Apache Software Foundation
+
+         This product includes software developed at
+         The Apache Software Foundation (http://www.apache.org/).
+
+         Portions of Ivy were originally developed at
+         Jayasoft SARL (http://www.jayasoft.fr/)
+         and are licensed to the Apache Software Foundation under the
+         "Software Grant License Agreement"
+
+         SSH and SFTP support is provided by the JCraft JSch package,
+         which is open source software, available under
+         the terms of a BSD style license.
+         The original software and related information is available
+         at http://www.jcraft.com/jsch/.
+
+  (ASLv2) Apache Commons Math
+    The following NOTICE information applies:
+      Apache Commons Math
+      Copyright 2001-2012 The Apache Software Foundation
+
+  (ASLv2) Apache Hive
+    The following NOTICE information applies:
+      Apache Hive
+      Copyright 2008-2015 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes Jersey (https://jersey.java.net/)
+      Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+      This project includes software copyrighted by Microsoft Corporation and
+      licensed under the Apache License, Version 2.0.
+
+      This project includes software copyrighted by Dell SecureWorks and
+      licensed under the Apache License, Version 2.0.
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+       ## Credits
+
+       A list of contributors may be found from CREDITS file, which is included
+       in some artifacts (usually source distributions); but is always available
+       from the source code management (SCM) system project uses.
+
+  (ASLv2) BoneCP
+    The following NOTICE information applies:
+       BoneCP
+       Copyright 2010 Wallace Wadge
+
+  (ASLv2) Apache Hadoop
+    The following NOTICE information applies:
+      The binary distribution of this product bundles binaries of
+      org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the
+      following notices:
+      * Copyright 2011 Dain Sundstrom <da...@iq80.com>
+      * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+      The binary distribution of this product bundles binaries of
+      org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni),
+      which has the following notices:
+      * This product includes software developed by FuseSource Corp.
+        http://fusesource.com
+      * This product includes software developed at
+        Progress Software Corporation and/or its  subsidiaries or affiliates.
+      * This product includes software developed by IBM Corporation and others.
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Curator
+    The following NOTICE information applies:
+      Curator Framework
+      Copyright 2011-2014 The Apache Software Foundation
+
+      Curator Client
+      Copyright 2011-2014 The Apache Software Foundation
+
+      Curator Recipes
+      Copyright 2011-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Derby
+    The following NOTICE information applies:
+      Apache Derby
+      Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
+      the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
+
+  (ASLv2) Apache Geronimo
+    The following NOTICE information applies:
+      Apache Geronimo
+      Copyright 2003-2008 The Apache Software Foundation
+
+  (ASLv2) Jettison
+    The following NOTICE information applies:
+       Copyright 2006 Envoi Solutions LLC
+
+  (ASLv2) Jetty
+    The following NOTICE information applies:
+       Jetty Web Container
+       Copyright 1995-2019 Mort Bay Consulting Pty Ltd.
+
+  (ASLv2) Apache log4j
+    The following NOTICE information applies:
+      Apache log4j
+      Copyright 2007 The Apache Software Foundation
+
+  (ASLv2) Apache Thrift
+    The following NOTICE information applies:
+      Apache Thrift
+      Copyright 2006-2010 The Apache Software Foundation.
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+          http://creativecommons.org/publicdomain/zero/1.0/
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+      The following NOTICE information applies:
+        The Netty Project
+        Copyright 2011 The Netty Project
+
+  (ASLv2) Apache ZooKeeper
+     The following NOTICE information applies:
+       Apache ZooKeeper
+       Copyright 2009-2012 The Apache Software Foundation
+
+  (ASLv2) JPam
+    The following NOTICE information applies:
+      Copyright 2003-2006 Greg Luck
+
+  (ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org)
+        groovy-2.4.16-indy
+        groovy-json-2.4.16-indy
+        groovy-sql-2.4.16-indy
+    The following NOTICE information applies:
+        Apache Groovy
+        Copyright 2003-2018 The Apache Software Foundation
+
+        This product includes software developed at
+        The Apache Software Foundation (http://www.apache.org/).
+
+        This product includes/uses ANTLR (http://www.antlr2.org/)
+        developed by Terence Parr 1989-2006
+
+  (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:1.2 - http://www.minidev.net/)
+    The following NOTICE information applies:
+      ASM Based Accessors Helper Used By JSON Smart 1.2
+      Copyright 2017, Uriel Chemouni
+
+  (ASLv2) JSON Smart (net.minidev:json-smart:jar:2.3 - http://www.minidev.net/)
+        The following NOTICE information applies:
+          JSON Smart 2.3
+          Copyright 2017, Uriel Chemouni, Eitan Raviv
+
+  (ASLv2) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt - https://connect2id.com/products/nimbus-jose-jwt)
+    The following NOTICE information applies:
+      Nimbus JOSE+JWT
+      Copyright 2021, Connect2id Ltd.
+
+  (ASLv2) Woodstox (com.fasterxml.woodstox:woodstox-core:bundle:5.3.0 - https://github.com/FasterXML/woodstox)
+      The following NOTICE information applies:
+        Woodstox Core 5.3.0
+        Copyright 2015, FasterXML, LLC
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) java-util
+    The following NOTICE information applies:
+       java-util
+       Copyright 2011-2017 Metamarkets Group Inc.
+
+  (ASLv2) JCIP Annotations Under Apache License
+    The following NOTICE information applies:
+      JCIP Annotations Under Apache License
+      Copyright 2013 Stephen Connolly.
+
+  (ASLv2) Google GSON
+    The following NOTICE information applies:
+      Copyright 2008 Google Inc.
+
+  (ASLv2) Guava
+    The following NOTICE information applies:
+      Guava
+      Copyright 2015 The Guava Authors
+
+  (ASLv2) OkHttp
+    The following NOTICE information applies:
+      OkHttp
+      Copyright (C) 2014 Square, Inc.
+
+  (ASLv2) Okio
+    The following NOTICE information applies:
+      Okio
+      Copyright (C) 2014 Square, Inc.
+
+  (ASLv2) Dropwizard Metrics
+      The following NOTICE information applies:
+        Dropwizard Metrics
+        Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+  (ASLv2) atinject (javax.inject:javax.inject)
+    The following NOTICE information applies:
+      atinject
+      Copyright
+
+  (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+  (ASLv2) JetBrains/java-annotations
+    The following NOTICE information applies:
+      JetBrains/java-annotations
+      Copyright 2000-2016 JetBrains s.r.o.
+
+  (ASLv2) Apache Kerby
+    The following NOTICE information applies:
+      Apache Kerby
+      Copyright 2003-2018 The Apache Software Foundation
+
+    (ASLv2) Carrotsearch HPPC
+      The following NOTICE information applies:
+      HPPC borrowed code, ideas or both from:
+
+       * Apache Lucene, http://lucene.apache.org/
+         (Apache license)
+       * Fastutil, http://fastutil.di.unimi.it/
+         (Apache license)
+       * Koloboke, https://github.com/OpenHFT/Koloboke
+         (Apache license)
+
+  (ASLv2) Ehcache 2.x
+    The following NOTICE information applies:
+      Copyright 2003-2010 Terracotta, Inc.
+
+  (ASLv2) Google Guice
+    The following NOTICE information applies:
+      Google Guice - Core Library
+      Copyright 2006-2011 Google, Inc.
+
+      Google Guice - Extensions - Servlet
+      Copyright 2006-2011 Google, Inc.
+
+  (ASLv2) Apache Arrow
+    The following NOTICE information applies:
+      Copyright 2016-2019 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+
+  (ASLv2) Apache ORC
+    The following NOTICE information applies:
+      Copyright 2013-2019 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes software developed by Hewlett-Packard:
+      (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+  The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.19 - https://jersey.java.net/)
+        (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.11 - https://jaxb.dev.java.net/)
+
+************************
+Common Development and Distribution License 1.0
+************************
+
+    The following binary components are provided under the Common Development and Distribution License 1.0.  See project link for details.
+
+    (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:3.1.0 - no url available)
+    (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
+    (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
+
+*****************
+Public Domain
+*****************
+
+  The following binary components are provided to the 'Public Domain'.  See project link for details.
+
+      (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
+
+
+************************
+Eclipse Distribution License 1.0
+************************
+
+  The following binary components are provided under the Eclipse Distribution License 1.0.
+
+      (EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.1)
+      (EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
+
+************************
+Eclipse Public License 2.0
+************************
+
+  The following binary components are provided under the Eclipse Public License 2.0.
+
+      (EPL 2.0) javax.ws.rs-api (https://github.com/eclipse-ee4j/jaxrs-api) javax.ws.rs:javax.ws.rs-api:bundle:2.1.1
+
+************************
+BSD License
+************************
+
+  (BSD) JSch
+    The following NOTICE information applies:
+      Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc.
+      All rights reserved.
+      https://www.jcraft.com/jsch/
+
+  (BSD 3-Clause) JLine Bundle
+    The following NOTICE information applies:
+      Copyright (c) 2002-2007, Marc Prud'hommeaux. All rights reserved.
+      https://github.com/jline/jline1
+
+  (BSD 3-Clause) ThreeTen-Extra
+    The following NOTICE information applies:
+      Copyright (c) 2007-2022, Stephen Colebourne & Michael Nascimento Santos.
+      https://github.com/ThreeTen/threeten-extra/
+
+************************
+Go License
+************************
+
+The following binary components are provided under the Go License.  See project link for details.
+
+  (Go) RE2/J
+    The following NOTICE information applies:
+      Copyright (c) 2009 The Go Authors. All rights reserved.
+      https://github.com/google/re2j
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
new file mode 100644
index 0000000000..0d96a0b180
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <!-- Internal dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+
+        <!-- External dependencies -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-metastore</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-orc</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.gson</groupId>
+                    <artifactId>gson</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-hbase-compat-1.0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-hadoop-bundle</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.tdunning</groupId>
+                    <artifactId>json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.transaction</groupId>
+                    <artifactId>transaction-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.zaxxer</groupId>
+                    <artifactId>HikariCP</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
new file mode 100644
index 0000000000..991ac625dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * Provides a basic connector to Iceberg catalog services.
+ */
+public interface IcebergCatalogService extends ControllerService {
+
+    Catalog getCatalog();
+
+    Configuration getConfiguration();
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml
new file mode 100644
index 0000000000..c376976906
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-services-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api-nar</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..6effaa89d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..ee13e4dc2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,17 @@
+nifi-iceberg-services-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+  (ASLv2) Apache Iceberg
+    The following NOTICE information applies:
+      Apache Iceberg
+      Copyright 2017-2022 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
new file mode 100644
index 0000000000..37c4e8150f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
new file mode 100644
index 0000000000..38f156c68d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+
+/**
+ * Abstract class holding common properties and methods for Catalog Service implementations.
+ */
+public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+    protected Configuration configuration = new Configuration();
+
+    static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+            .name("hadoop-config-resources")
+            .displayName("Hadoop Configuration Resources")
+            .description("A file, or comma separated list of files, which contain the Hadoop configuration (core-site.xml, etc.). Without this, default configuration will be used.")
+            .required(false)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dynamicallyModifiesClasspath(true)
+            .build();
+
+    /**
+     * Loads configuration files from the provided paths.
+     *
+     * @param configFiles list of config file paths separated with comma
+     * @return merged configuration
+     */
+    protected Configuration getConfigurationFromFiles(String configFiles) {
+        final Configuration conf = new Configuration();
+        if (StringUtils.isNotBlank(configFiles)) {
+            for (final String configFile : configFiles.split(",")) {
+                conf.addResource(new Path(configFile.trim()));
+            }
+        }
+        return conf;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
new file mode 100644
index 0000000000..dcf2dd395f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"})
+@CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.")
+public class HadoopCatalogService extends AbstractCatalogService {
+
+    static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder()
+            .name("warehouse-path")
+            .displayName("Warehouse Path")
+            .description("Path to the location of the warehouse.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            WAREHOUSE_PATH,
+            HADOOP_CONFIGURATION_RESOURCES
+    ));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private HadoopCatalog catalog;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final String warehousePath = context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue();
+
+        if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+            final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+            configuration = getConfigurationFromFiles(configFiles);
+            catalog = new HadoopCatalog(configuration, warehousePath);
+        } else {
+            catalog = new HadoopCatalog(new Configuration(), warehousePath);
+        }
+    }
+
+    @Override
+    public Catalog getCatalog() {
+        return catalog;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
new file mode 100644
index 0000000000..9dc648c3e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"iceberg", "catalog", "service", "metastore", "hive"})
+@CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.")
+public class HiveCatalogService extends AbstractCatalogService {
+
+    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
+            .name("hive-metastore-uri")
+            .displayName("Hive Metastore URI")
+            .description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.URI_LIST_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WAREHOUSE_LOCATION = new PropertyDescriptor.Builder()
+            .name("warehouse-location")
+            .displayName("Default Warehouse Location")
+            .description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            METASTORE_URI,
+            WAREHOUSE_LOCATION,
+            HADOOP_CONFIGURATION_RESOURCES
+    ));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private HiveCatalog catalog;
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+
+        final List<ValidationResult> problems = new ArrayList<>();
+        String configMetastoreUri = null;
+        String configWarehouseLocation = null;
+
+        if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+            final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+            Configuration configuration = getConfigurationFromFiles(configFiles);
+            configMetastoreUri = configuration.get("hive.metastore.uris");
+            configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir");
+        }
+
+        final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
+        final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
+
+        if (configMetastoreUri == null && propertyMetastoreUri == null) {
+            problems.add(new ValidationResult.Builder()
+                    .subject("Hive Metastore URI")
+                    .valid(false)
+                    .explanation("cannot find hive metastore uri, please provide it in the 'Hive Metastore URI' property" +
+                            " or provide a configuration file which contains 'hive.metastore.uris' value.")
+                    .build());
+        }
+
+        if (configWarehouseLocation == null && propertyWarehouseLocation == null) {
+            problems.add(new ValidationResult.Builder()
+                    .subject("Default Warehouse Location")
+                    .valid(false)
+                    .explanation("cannot find default warehouse location, please provide it in the 'Default Warehouse Location' property" +
+                            " or provide a configuration file which contains 'hive.metastore.warehouse.dir' value.")
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        catalog = new HiveCatalog();
+        Map<String, String> properties = new HashMap<>();
+
+        if (context.getProperty(METASTORE_URI).isSet()) {
+            properties.put(CatalogProperties.URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(WAREHOUSE_LOCATION).isSet()) {
+            properties.put(CatalogProperties.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+            final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+            configuration = getConfigurationFromFiles(configFiles);
+            catalog.setConf(configuration);
+        }
+
+        catalog.initialize("hive-catalog", properties);
+    }
+
+    @Override
+    public Catalog getCatalog() {
+        return catalog;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100755
index 0000000000..4c042964b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.services.iceberg.HiveCatalogService
+org.apache.nifi.services.iceberg.HadoopCatalogService
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml
new file mode 100644
index 0000000000..6eb5af7028
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <properties>
+        <iceberg.version>0.14.0</iceberg.version>
+        <hive.version>3.1.3</hive.version>
+        <hadoop.version>3.3.3</hadoop.version>
+    </properties>
+
+    <modules>
+        <module>nifi-iceberg-services-api</module>
+        <module>nifi-iceberg-services-api-nar</module>
+        <module>nifi-iceberg-services</module>
+        <module>nifi-iceberg-services-nar</module>
+        <module>nifi-iceberg-processors</module>
+        <module>nifi-iceberg-processors-nar</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 9856777d0d..d9521d3396 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -119,6 +119,7 @@
         <module>nifi-box-bundle</module>
         <module>nifi-flow-registry-client-bundle</module>
         <module>nifi-shopify-bundle</module>
+        <module>nifi-iceberg-bundle</module>
     </modules>
 
     <build>