You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/11 19:53:44 UTC
[nifi] branch main updated: NIFI-10442: Create PutIceberg processor
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e87bced147 NIFI-10442: Create PutIceberg processor
e87bced147 is described below
commit e87bced14775e19208617ad48c4ec20b98f8699f
Author: Mark Bathori <ba...@gmail.com>
AuthorDate: Tue Sep 6 10:06:17 2022 +0200
NIFI-10442: Create PutIceberg processor
This closes #6368.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
nifi-assembly/pom.xml | 26 +
.../serialization/record/util/DataTypeUtils.java | 6 +-
.../nifi-iceberg-processors-nar/pom.xml | 49 ++
.../src/main/resources/META-INF/LICENSE | 222 +++++++
.../src/main/resources/META-INF/NOTICE | 505 ++++++++++++++
.../nifi-iceberg-processors/pom.xml | 175 +++++
.../iceberg/AbstractIcebergProcessor.java | 125 ++++
.../apache/nifi/processors/iceberg/PutIceberg.java | 260 ++++++++
.../iceberg/converter/ArrayElementGetter.java | 115 ++++
.../iceberg/converter/DataConverter.java | 24 +
.../iceberg/converter/GenericDataConverters.java | 230 +++++++
.../iceberg/converter/IcebergRecordConverter.java | 190 ++++++
.../iceberg/converter/RecordFieldGetter.java | 122 ++++
.../iceberg/writer/IcebergPartitionedWriter.java | 50 ++
.../iceberg/writer/IcebergTaskWriterFactory.java | 66 ++
.../services/org.apache.nifi.processor.Processor | 16 +
.../nifi/processors/iceberg/TestFileAbort.java | 108 +++
.../iceberg/TestIcebergRecordConverter.java | 706 ++++++++++++++++++++
.../iceberg/TestPutIcebergWithHadoopCatalog.java | 206 ++++++
.../iceberg/TestPutIcebergWithHiveCatalog.java | 276 ++++++++
.../iceberg/catalog/TestHadoopCatalogService.java | 51 ++
.../iceberg/catalog/TestHiveCatalogService.java | 58 ++
.../iceberg/metastore/MetastoreCore.java | 184 +++++
.../processors/iceberg/metastore/ScriptRunner.java | 76 +++
.../iceberg/metastore/ThriftMetastore.java | 50 ++
.../processors/iceberg/util/IcebergTestUtils.java | 134 ++++
.../src/test/resources/date.avsc | 44 ++
.../src/test/resources/hive-schema-3.2.0.derby.sql | 738 +++++++++++++++++++++
.../src/test/resources/user.avsc | 26 +
.../nifi-iceberg-services-api-nar/pom.xml | 43 ++
.../src/main/resources/META-INF/LICENSE | 209 ++++++
.../src/main/resources/META-INF/NOTICE | 501 ++++++++++++++
.../nifi-iceberg-services-api/pom.xml | 149 +++++
.../services/iceberg/IcebergCatalogService.java | 32 +
.../nifi-iceberg-services-nar/pom.xml | 43 ++
.../src/main/resources/META-INF/LICENSE | 209 ++++++
.../src/main/resources/META-INF/NOTICE | 17 +
.../nifi-iceberg-services/pom.xml | 43 ++
.../services/iceberg/AbstractCatalogService.java | 67 ++
.../services/iceberg/HadoopCatalogService.java | 77 +++
.../nifi/services/iceberg/HiveCatalogService.java | 140 ++++
.../org.apache.nifi.controller.ControllerService | 17 +
nifi-nar-bundles/nifi-iceberg-bundle/pom.xml | 44 ++
nifi-nar-bundles/pom.xml | 1 +
44 files changed, 6429 insertions(+), 1 deletion(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 3f56380639..599873016c 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
</plugins>
</build>
</profile>
+ <profile>
+ <id>include-iceberg</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-processors-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 21e3b71827..5000c78fe1 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -236,11 +236,15 @@ public class DataTypeUtils {
return null;
}
- private static Object toUUID(Object value) {
+ public static UUID toUUID(Object value) {
if (value == null) {
throw new IllegalTypeConversionException("Null values cannot be converted to a UUID");
}
+ if (value instanceof UUID) {
+ return (UUID) value;
+ }
+
if (value instanceof String) {
try {
return UUID.fromString((String)value);
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
new file mode 100644
index 0000000000..2860e79244
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-processors-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-processors</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..f3574b43b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,222 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Hive.
+
+* Hive metastore derby schema in hive-schema-3.2.0.derby.sql
+* Test methods from IcebergTestUtils.java
+* Test metastore from MetastoreCore.java, ScriptRunner.java, ThriftMetastore.java
+
+Copyright: 2011-2018 The Apache Software Foundation
+Home page: https://hive.apache.org/
+License: https://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..ddc1223787
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,505 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+ (ASLv2) Apache Iceberg
+ The following NOTICE information applies:
+ Apache Iceberg
+ Copyright 2017-2022 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Ant
+ The following NOTICE information applies:
+ Apache Ant
+ Copyright 1999-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons DBCP
+ The following NOTICE information applies:
+ Apache Commons DBCP
+ Copyright 2001-2015 The Apache Software Foundation.
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpComponents Client
+ Copyright 1999-2016 The Apache Software Foundation
+ Apache HttpComponents Core - HttpCore
+ Copyright 2006-2009 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Pool
+ The following NOTICE information applies:
+ Apache Commons Pool
+ Copyright 1999-2009 The Apache Software Foundation.
+
+ (ASLv2) Apache Commons BeanUtils
+ The following NOTICE information applies:
+ Apache Commons BeanUtils
+ Copyright 2000-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons IO
+ The following NOTICE information applies:
+ Apache Commons IO
+ Copyright 2002-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Net
+ The following NOTICE information applies:
+ Apache Commons Net
+ Copyright 2001-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2017 The Apache Software Foundation
+
+ (ASLv2) Apache Parquet
+ The following NOTICE information applies:
+ Apache Parquet MR (Incubating)
+ Copyright 2014 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Audience Annotations
+ The following NOTICE information applies:
+ Apache Yetus
+ Copyright 2008-2018 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Compress
+ The following NOTICE information applies:
+ Apache Commons Compress
+ Copyright 2002-2017 The Apache Software Foundation
+
+ The files in the package org.apache.commons.compress.archivers.sevenz
+ were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+ which has been placed in the public domain:
+
+ "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+ (ASLv2) Apache Commons Configuration
+ The following NOTICE information applies:
+ Apache Commons Configuration
+ Copyright 2001-2017 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Text
+ The following NOTICE information applies:
+ Apache Commons Text
+ Copyright 2001-2018 The Apache Software Foundation
+
+ (ASLv2) Apache Commons CLI
+ The following NOTICE information applies:
+ Apache Commons CLI
+ Copyright 2001-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Commons Collections
+ The following NOTICE information applies:
+ Apache Commons Collections
+ Copyright 2001-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Daemon
+ The following NOTICE information applies:
+ Apache Commons Daemon
+ Copyright 2002-2013 The Apache Software Foundation
+
+ (ASLv2) Apache Ivy
+ The following NOTICE information applies:
+ Copyright 2007-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Portions of Ivy were originally developed at
+ Jayasoft SARL (http://www.jayasoft.fr/)
+ and are licensed to the Apache Software Foundation under the
+ "Software Grant License Agreement"
+
+ SSH and SFTP support is provided by the JCraft JSch package,
+ which is open source software, available under
+ the terms of a BSD style license.
+ The original software and related information is available
+ at http://www.jcraft.com/jsch/.
+
+ (ASLv2) Apache Commons Math
+ The following NOTICE information applies:
+ Apache Commons Math
+ Copyright 2001-2012 The Apache Software Foundation
+
+ (ASLv2) Apache Hive
+ The following NOTICE information applies:
+ Apache Hive
+ Copyright 2008-2015 The Apache Software Foundation
+
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+
+ This product includes Jersey (https://jersey.java.net/)
+ Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+ This project includes software copyrighted by Microsoft Corporation and
+ licensed under the Apache License, Version 2.0.
+
+ This project includes software copyrighted by Dell SecureWorks and
+ licensed under the Apache License, Version 2.0.
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) BoneCP
+ The following NOTICE information applies:
+ BoneCP
+ Copyright 2010 Wallace Wadge
+
+ (ASLv2) Apache Hadoop
+ The following NOTICE information applies:
+ The binary distribution of this product bundles binaries of
+ org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the
+ following notices:
+ * Copyright 2011 Dain Sundstrom <da...@iq80.com>
+ * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+ The binary distribution of this product bundles binaries of
+ org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni),
+ which has the following notices:
+ * This product includes software developed by FuseSource Corp.
+ http://fusesource.com
+ * This product includes software developed at
+ Progress Software Corporation and/or its subsidiaries or affiliates.
+ * This product includes software developed by IBM Corporation and others.
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ (ASLv2) Apache Curator
+ The following NOTICE information applies:
+ Curator Framework
+ Copyright 2011-2014 The Apache Software Foundation
+
+ Curator Client
+ Copyright 2011-2014 The Apache Software Foundation
+
+ Curator Recipes
+ Copyright 2011-2014 The Apache Software Foundation
+
+ (ASLv2) Apache Derby
+ The following NOTICE information applies:
+ Apache Derby
+ Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
+ the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
+
+ (ASLv2) Apache Geronimo
+ The following NOTICE information applies:
+ Apache Geronimo
+ Copyright 2003-2008 The Apache Software Foundation
+
+ (ASLv2) Jettison
+ The following NOTICE information applies:
+ Copyright 2006 Envoi Solutions LLC
+
+ (ASLv2) Jetty
+ The following NOTICE information applies:
+ Jetty Web Container
+ Copyright 1995-2019 Mort Bay Consulting Pty Ltd.
+
+ (ASLv2) Apache log4j
+ The following NOTICE information applies:
+ Apache log4j
+ Copyright 2007 The Apache Software Foundation
+
+ (ASLv2) Apache Thrift
+ The following NOTICE information applies:
+ Apache Thrift
+ Copyright 2006-2010 The Apache Software Foundation.
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+ LongAdder), which was released with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ The Netty Project
+ Copyright 2011 The Netty Project
+
+ (ASLv2) Apache ZooKeeper
+ The following NOTICE information applies:
+ Apache ZooKeeper
+ Copyright 2009-2012 The Apache Software Foundation
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
+
+ (ASLv2) JPam
+ The following NOTICE information applies:
+ Copyright 2003-2006 Greg Luck
+
+ (ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org)
+ groovy-2.4.16-indy
+ groovy-json-2.4.16-indy
+ groovy-sql-2.4.16-indy
+ The following NOTICE information applies:
+ Apache Groovy
+ Copyright 2003-2018 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ This product includes/uses ANTLR (http://www.antlr2.org/)
+ developed by Terence Parr 1989-2006
+
+ (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:1.2 - http://www.minidev.net/)
+ The following NOTICE information applies:
+ ASM Based Accessors Helper Used By JSON Smart 1.2
+ Copyright 2017, Uriel Chemouni
+
+ (ASLv2) JSON Smart (net.minidev:json-smart:jar:2.3 - http://www.minidev.net/)
+ The following NOTICE information applies:
+ JSON Smart 2.3
+ Copyright 2017, Uriel Chemouni, Eitan Raviv
+
+ (ASLv2) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt - https://connect2id.com/products/nimbus-jose-jwt)
+ The following NOTICE information applies:
+ Nimbus JOSE+JWT
+ Copyright 2021, Connect2id Ltd.
+
+ (ASLv2) Woodstox (com.fasterxml.woodstox:woodstox-core:bundle:5.3.0 - https://github.com/FasterXML/woodstox)
+ The following NOTICE information applies:
+ Woodstox Core 5.3.0
+ Copyright 2015, FasterXML, LLC
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) java-util
+ The following NOTICE information applies:
+ java-util
+ Copyright 2011-2017 Metamarkets Group Inc.
+
+ (ASLv2) JCIP Annotations Under Apache License
+ The following NOTICE information applies:
+ JCIP Annotations Under Apache License
+ Copyright 2013 Stephen Connolly.
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
+
+ (ASLv2) Guava
+ The following NOTICE information applies:
+ Guava
+ Copyright 2015 The Guava Authors
+
+ (ASLv2) OkHttp
+ The following NOTICE information applies:
+ OkHttp
+ Copyright (C) 2014 Square, Inc.
+
+ (ASLv2) Okio
+ The following NOTICE information applies:
+ Okio
+ Copyright (C) 2014 Square, Inc.
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Dropwizard Metrics
+ Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+ (ASLv2) atinject (javax.inject:javax.inject)
+ The following NOTICE information applies:
+ atinject
+ Copyright
+
+ (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+ (ASLv2) JetBrains/java-annotations
+ The following NOTICE information applies:
+ JetBrains/java-annotations
+ Copyright 2000-2016 JetBrains s.r.o.
+
+ (ASLv2) Apache Kerby
+ The following NOTICE information applies:
+ Apache Kerby
+ Copyright 2003-2018 The Apache Software Foundation
+
+ (ASLv2) Carrotsearch HPPC
+ The following NOTICE information applies:
+ HPPC borrowed code, ideas or both from:
+
+ * Apache Lucene, http://lucene.apache.org/
+ (Apache license)
+ * Fastutil, http://fastutil.di.unimi.it/
+ (Apache license)
+ * Koloboke, https://github.com/OpenHFT/Koloboke
+ (Apache license)
+
+ (ASLv2) Ehcache 2.x
+ The following NOTICE information applies:
+ Copyright 2003-2010 Terracotta, Inc.
+
+ (ASLv2) Google Guice
+ The following NOTICE information applies:
+ Google Guice - Core Library
+ Copyright 2006-2011 Google, Inc.
+
+ Google Guice - Extensions - Servlet
+ Copyright 2006-2011 Google, Inc.
+
+ (ASLv2) Apache Arrow
+ The following NOTICE information applies:
+ Copyright 2016-2019 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+
+ (ASLv2) Apache ORC
+ The following NOTICE information applies:
+ Copyright 2013-2019 The Apache Software Foundation
+
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+
+ This product includes software developed by Hewlett-Packard:
+ (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.11 - https://jaxb.dev.java.net/)
+
+************************
+Common Development and Distribution License 1.0
+************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
+
+ (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:3.1.0 - no url available)
+ (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
+ (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
+
+*****************
+Public Domain
+*****************
+
+ The following binary components are provided to the 'Public Domain'. See project link for details.
+
+ (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
+
+
+************************
+Eclipse Distribution License 1.0
+************************
+
+ The following binary components are provided under the Eclipse Distribution License 1.0.
+
+ (EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.1)
+ (EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
+
+************************
+Eclipse Public License 2.0
+************************
+
+ The following binary components are provided under the Eclipse Public License 2.0.
+
+ (EPL 2.0) javax.ws.rs-api (https://github.com/eclipse-ee4j/jaxrs-api) javax.ws.rs:javax.ws.rs-api:bundle:2.1.1
+
+************************
+BSD License
+************************
+
+ (BSD) JSch
+ The following NOTICE information applies:
+ Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc.
+ All rights reserved.
+ https://www.jcraft.com/jsch/
+
+ (BSD 3-Clause) JLine Bundle
+ The following NOTICE information applies:
+ Copyright (c) 2002-2007, Marc Prud'hommeaux. All rights reserved.
+ https://github.com/jline/jline1
+
+ (BSD 3-Clause) ThreeTen-Extra
+ The following NOTICE information applies:
+ Copyright (c) 2007-2022, Stephen Colebourne & Michael Nascimento Santos.
+ https://github.com/ThreeTen/threeten-extra/
+
+************************
+Go License
+************************
+
+The following binary components are provided under the Go License. See project link for details.
+
+ (Go) RE2/J
+ The following NOTICE information applies:
+ Copyright (c) 2009 The Go Authors. All rights reserved.
+ https://github.com/google/re2j
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
new file mode 100644
index 0000000000..09c5081d08
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-processors</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- Internal dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-kerberos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-kerberos</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-utils</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <classifier>core</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-web</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-tez</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
new file mode 100644
index 0000000000..7dc53eefd7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+import static org.apache.nifi.processors.iceberg.PutIceberg.REL_FAILURE;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+ static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+ .name("catalog-service")
+ .displayName("Catalog Service")
+ .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+ .identifiesControllerService(IcebergCatalogService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
+ .identifiesControllerService(KerberosUserService.class)
+ .build();
+
+ private volatile KerberosUser kerberosUser;
+ private volatile UserGroupInformation ugi;
+
+ @OnScheduled
+ public final void onScheduled(final ProcessContext context) {
+ final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+ final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+
+ if (kerberosUserService != null) {
+ this.kerberosUser = kerberosUserService.createKerberosUser();
+ try {
+ this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser);
+ } catch (IOException e) {
+ throw new ProcessException("Kerberos Authentication failed", e);
+ }
+ }
+ }
+
+ @OnStopped
+ public final void onStopped() {
+ if (kerberosUser != null) {
+ try {
+ kerberosUser.logout();
+ } catch (KerberosLoginException e) {
+ getLogger().error("Error logging out kerberos user", e);
+ } finally {
+ kerberosUser = null;
+ ugi = null;
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ if (kerberosUser == null) {
+ doOnTrigger(context, session, flowFile);
+ } else {
+ try {
+ getUgi().doAs((PrivilegedExceptionAction<Void>) () -> {
+ doOnTrigger(context, session, flowFile);
+ return null;
+ });
+
+ } catch (Exception e) {
+ getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+
+ private UserGroupInformation getUgi() {
+ try {
+ kerberosUser.checkTGTAndRelogin();
+ } catch (KerberosLoginException e) {
+ throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ return ugi;
+ }
+
+ protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException;
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
new file mode 100644
index 0000000000..7fb136b853
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.Tasks;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
+ "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. " +
+ "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+ "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile.")
+})
+public class PutIceberg extends AbstractIcebergProcessor {
+
+ public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+ .name("catalog-namespace")
+ .displayName("Catalog Namespace")
+ .description("The namespace of the catalog.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("table-name")
+ .displayName("Table Name")
+ .description("The name of the table.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder()
+ .name("file-format")
+ .displayName("File Format")
+ .description("File format to use when writing Iceberg data files." +
+ " If not set, then the 'write.format.default' table property will be used, default value is parquet.")
+ .allowableValues(
+ new AllowableValue("AVRO"),
+ new AllowableValue("PARQUET"),
+ new AllowableValue("ORC"))
+ .build();
+
+ static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder()
+ .name("maximum-file-size")
+ .displayName("Maximum File Size")
+ .description("The maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data." +
+ " If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.")
+ .addValidator(StandardValidators.LONG_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ RECORD_READER,
+ CATALOG,
+ CATALOG_NAMESPACE,
+ TABLE_NAME,
+ FILE_FORMAT,
+ MAXIMUM_FILE_SIZE,
+ KERBEROS_USER_SERVICE
+ ));
+
+ public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final String fileFormat = context.getProperty(FILE_FORMAT).evaluateAttributeExpressions().getValue();
+ final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions().getValue();
+
+ Table table;
+
+ try {
+ table = loadTable(context);
+ } catch (Exception e) {
+ getLogger().error("Failed to load table from catalog", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ TaskWriter<org.apache.iceberg.data.Record> taskWriter = null;
+ int recordCount = 0;
+
+ try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+ final FileFormat format = getFileFormat(table.properties(), fileFormat);
+ final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
+ taskWriter = taskWriterFactory.create();
+
+ final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format);
+
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ taskWriter.write(recordConverter.convert(record));
+ recordCount++;
+ }
+
+ final WriteResult result = taskWriter.complete();
+ appendDataFiles(table, result);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
+ try {
+ if (taskWriter != null) {
+ abort(taskWriter.dataFiles(), table);
+ }
+ } catch (Exception ex) {
+ getLogger().error("Failed to abort uncommitted data files", ex);
+ }
+
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ /**
+ * Loads a table from the catalog service with the provided values from the property context.
+ *
+ * @param context holds the user provided information for the {@link Catalog} and the {@link Table}
+ * @return loaded table
+ */
+ private Table loadTable(PropertyContext context) {
+ final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+ final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+ final Catalog catalog = catalogService.getCatalog();
+
+ final Namespace namespace = Namespace.of(catalogNamespace);
+ final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
+
+ return catalog.loadTable(tableIdentifier);
+ }
+
+ /**
+ * Appends the pending data files to the given {@link Table}.
+ *
+ * @param table table to append
+ * @param result datafiles created by the {@link TaskWriter}
+ */
+ private void appendDataFiles(Table table, WriteResult result) {
+ final AppendFiles appender = table.newAppend();
+ Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
+
+ appender.commit();
+ }
+
+ /**
+ * Determines the write file format from the requested value and the table configuration.
+ *
+ * @param tableProperties table properties
+ * @param fileFormat requested file format from the processor
+ * @return file format to use
+ */
+ private FileFormat getFileFormat(Map<String, String> tableProperties, String fileFormat) {
+ final String fileFormatName = fileFormat != null ? fileFormat : tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.valueOf(fileFormatName.toUpperCase(Locale.ENGLISH));
+ }
+
+ /**
+ * Deletes the completed data files that have not been committed to the table yet.
+ *
+ * @param dataFiles files created by the task writer
+ * @param table table
+ */
+ void abort(DataFile[] dataFiles, Table table) {
+ Tasks.foreach(dataFiles)
+ .throwFailureWhenFinished()
+ .noRetry()
+ .run(file -> table.io().deleteFile(file.path().toString()));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
new file mode 100644
index 0000000000..9a98404b26
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.ZoneId;
+
+public class ArrayElementGetter {
+
+ private static final String ARRAY_FIELD_NAME = "array element";
+
+ /**
+ * Creates an accessor for getting elements in an internal array data structure at the given
+ * position.
+ *
+ * @param dataType the element type of the array
+ */
+ public static ElementGetter createElementGetter(DataType dataType) {
+ ElementGetter elementGetter;
+ switch (dataType.getFieldType()) {
+ case STRING:
+ elementGetter = (array, pos) -> DataTypeUtils.toString(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case CHAR:
+ elementGetter = (array, pos) -> DataTypeUtils.toCharacter(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case BOOLEAN:
+ elementGetter = (array, pos) -> DataTypeUtils.toBoolean(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case DECIMAL:
+ elementGetter = (array, pos) -> DataTypeUtils.toBigDecimal(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case BYTE:
+ elementGetter = (array, pos) -> DataTypeUtils.toByte(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case SHORT:
+ elementGetter = (array, pos) -> DataTypeUtils.toShort(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case INT:
+ elementGetter = (array, pos) -> DataTypeUtils.toInteger(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case DATE:
+ elementGetter = (array, pos) -> DataTypeUtils.toLocalDate(array[pos], () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), ARRAY_FIELD_NAME);
+ break;
+ case TIME:
+ elementGetter = (array, pos) -> DataTypeUtils.toTime(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
+ break;
+ case LONG:
+ elementGetter = (array, pos) -> DataTypeUtils.toLong(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case BIGINT:
+ elementGetter = (array, pos) -> DataTypeUtils.toBigInt(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case FLOAT:
+ elementGetter = (array, pos) -> DataTypeUtils.toFloat(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case DOUBLE:
+ elementGetter = (array, pos) -> DataTypeUtils.toDouble(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case TIMESTAMP:
+ elementGetter = (array, pos) -> DataTypeUtils.toTimestamp(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
+ break;
+ case UUID:
+ elementGetter = (array, pos) -> DataTypeUtils.toUUID(array[pos]);
+ break;
+ case ARRAY:
+ elementGetter = (array, pos) -> DataTypeUtils.toArray(array[pos], ARRAY_FIELD_NAME, ((ArrayDataType) dataType).getElementType());
+ break;
+ case MAP:
+ elementGetter = (array, pos) -> DataTypeUtils.toMap(array[pos], ARRAY_FIELD_NAME);
+ break;
+ case RECORD:
+ elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ return (array, pos) -> {
+ if (array[pos] == null) {
+ return null;
+ }
+
+ return elementGetter.getElementOrNull(array, pos);
+ };
+ }
+
+ /**
+ * Accessor for getting the elements of an array during runtime.
+ */
+ public interface ElementGetter extends Serializable {
+ @Nullable
+ Object getElementOrNull(Object[] array, int pos);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java
new file mode 100644
index 0000000000..1ea761db48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/DataConverter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+/**
+ * Interface for data conversion between NiFi Record and Iceberg Record.
+ */
+public interface DataConverter<D, T> {
+ T convert(D data);
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
new file mode 100644
index 0000000000..d216299d18
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.createFieldGetter;
+
+/**
+ * Data converter implementations for different data types.
+ */
+public class GenericDataConverters {
+
+ static class SameTypeConverter implements DataConverter<Object, Object> {
+
+ static final SameTypeConverter INSTANCE = new SameTypeConverter();
+
+ @Override
+ public Object convert(Object data) {
+ return data;
+ }
+ }
+
+ static class TimeConverter implements DataConverter<Time, LocalTime> {
+
+ static final TimeConverter INSTANCE = new TimeConverter();
+
+ @Override
+ public LocalTime convert(Time data) {
+ return data.toLocalTime();
+ }
+ }
+
+ static class TimestampConverter implements DataConverter<Timestamp, LocalDateTime> {
+
+ static final TimestampConverter INSTANCE = new TimestampConverter();
+
+ @Override
+ public LocalDateTime convert(Timestamp data) {
+ return data.toLocalDateTime();
+ }
+ }
+
+ static class TimestampWithTimezoneConverter implements DataConverter<Timestamp, OffsetDateTime> {
+
+ static final TimestampWithTimezoneConverter INSTANCE = new TimestampWithTimezoneConverter();
+
+ @Override
+ public OffsetDateTime convert(Timestamp data) {
+ return OffsetDateTime.ofInstant(data.toInstant(), ZoneId.of("UTC"));
+ }
+ }
+
+ static class UUIDtoByteArrayConverter implements DataConverter<UUID, byte[]> {
+
+ static final UUIDtoByteArrayConverter INSTANCE = new UUIDtoByteArrayConverter();
+
+ @Override
+ public byte[] convert(UUID data) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+ byteBuffer.putLong(data.getMostSignificantBits());
+ byteBuffer.putLong(data.getLeastSignificantBits());
+ return byteBuffer.array();
+ }
+ }
+
+ static class FixedConverter implements DataConverter<Byte[], byte[]> {
+
+ private final int length;
+
+ FixedConverter(int length) {
+ this.length = length;
+ }
+
+ @Override
+ public byte[] convert(Byte[] data) {
+ Validate.isTrue(data.length == length, String.format("Cannot write byte array of length %s as fixed[%s]", data.length, length));
+ return ArrayUtils.toPrimitive(data);
+ }
+ }
+
+ static class BinaryConverter implements DataConverter<Byte[], ByteBuffer> {
+
+ static final BinaryConverter INSTANCE = new BinaryConverter();
+
+ @Override
+ public ByteBuffer convert(Byte[] data) {
+ return ByteBuffer.wrap(ArrayUtils.toPrimitive(data));
+ }
+ }
+
+ static class BigDecimalConverter implements DataConverter<BigDecimal, BigDecimal> {
+
+ private final int precision;
+ private final int scale;
+
+ BigDecimalConverter(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public BigDecimal convert(BigDecimal data) {
+ Validate.isTrue(data.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data);
+ Validate.isTrue(data.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data);
+ return data;
+ }
+ }
+
+ static class ArrayConverter<T, S> implements DataConverter<T[], List<S>> {
+ private final DataConverter<T, S> fieldConverter;
+ private final ArrayElementGetter.ElementGetter elementGetter;
+
+ ArrayConverter(DataConverter<T, S> elementConverter, DataType dataType) {
+ this.fieldConverter = elementConverter;
+ this.elementGetter = ArrayElementGetter.createElementGetter(dataType);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<S> convert(T[] data) {
+ final int numElements = data.length;
+ List<S> result = new ArrayList<>(numElements);
+ for (int i = 0; i < numElements; i += 1) {
+ result.add(i, fieldConverter.convert((T) elementGetter.getElementOrNull(data, i)));
+ }
+ return result;
+ }
+ }
+
+ static class MapConverter<K, V, L, B> implements DataConverter<Map<K, V>, Map<L, B>> {
+ private final DataConverter<K, L> keyConverter;
+ private final DataConverter<V, B> valueConverter;
+ private final ArrayElementGetter.ElementGetter keyGetter;
+ private final ArrayElementGetter.ElementGetter valueGetter;
+
+ MapConverter(DataConverter<K, L> keyConverter, DataType keyType, DataConverter<V, B> valueConverter, DataType valueType) {
+ this.keyConverter = keyConverter;
+ this.keyGetter = ArrayElementGetter.createElementGetter(keyType);
+ this.valueConverter = valueConverter;
+ this.valueGetter = ArrayElementGetter.createElementGetter(valueType);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<L, B> convert(Map<K, V> data) {
+ final int mapSize = data.size();
+ final Object[] keyArray = data.keySet().toArray();
+ final Object[] valueArray = data.values().toArray();
+ Map<L, B> result = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; i += 1) {
+ result.put(keyConverter.convert((K) keyGetter.getElementOrNull(keyArray, i)), valueConverter.convert((V) valueGetter.getElementOrNull(valueArray, i)));
+ }
+
+ return result;
+ }
+ }
+
+ static class RecordConverter implements DataConverter<Record, GenericRecord> {
+
+ private final DataConverter<?, ?>[] converters;
+ private final RecordFieldGetter.FieldGetter[] getters;
+
+ private final Types.StructType schema;
+
+ RecordConverter(List<DataConverter<?, ?>> converters, List<RecordField> recordFields, Types.StructType schema) {
+ this.schema = schema;
+ this.converters = (DataConverter<?, ?>[]) Array.newInstance(DataConverter.class, converters.size());
+ this.getters = new RecordFieldGetter.FieldGetter[converters.size()];
+ for (int i = 0; i < converters.size(); i += 1) {
+ final RecordField recordField = recordFields.get(i);
+ this.converters[i] = converters.get(i);
+ this.getters[i] = createFieldGetter(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
+ }
+ }
+
+ @Override
+ public GenericRecord convert(Record data) {
+ final GenericRecord template = GenericRecord.create(schema);
+ // GenericRecord.copy() is more performant then GenericRecord.create(StructType) since NAME_MAP_CACHE access is eliminated. Using copy here to gain performance.
+ final GenericRecord result = template.copy();
+
+ for (int i = 0; i < converters.length; i += 1) {
+ result.set(i, convert(data, i, converters[i]));
+ }
+
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T, S> S convert(Record record, int pos, DataConverter<T, S> converter) {
+ return converter.convert((T) getters[pos].getFieldOrNull(record));
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
new file mode 100644
index 0000000000..611f4b29b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+ private final DataConverter<Record, GenericRecord> converter;
+
+ public GenericRecord convert(Record record) {
+ return converter.convert(record);
+ }
+
+ @SuppressWarnings("unchecked")
+ public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+ this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+ }
+
+ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+ public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+ return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+ }
+
+ @Override
+ public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+ return converter;
+ }
+
+ @Override
+ public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+ return converter;
+ }
+
+ @Override
+ public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+ if (type.typeId() != null) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ case INTEGER:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case STRING:
+ return GenericDataConverters.SameTypeConverter.INSTANCE;
+ case TIME:
+ return GenericDataConverters.TimeConverter.INSTANCE;
+ case TIMESTAMP:
+ final Types.TimestampType timestampType = (Types.TimestampType) type;
+ if (timestampType.shouldAdjustToUTC()) {
+ return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+ }
+ return GenericDataConverters.TimestampConverter.INSTANCE;
+ case UUID:
+ final UUIDDataType uuidType = (UUIDDataType) dataType;
+ if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+ return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+ }
+ return GenericDataConverters.SameTypeConverter.INSTANCE;
+ case FIXED:
+ final Types.FixedType fixedType = (Types.FixedType) type;
+ return new GenericDataConverters.FixedConverter(fixedType.length());
+ case BINARY:
+ return GenericDataConverters.BinaryConverter.INSTANCE;
+ case DECIMAL:
+ final Types.DecimalType decimalType = (Types.DecimalType) type;
+ return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
+ }
+ }
+ throw new UnsupportedOperationException("Missing type id from PrimitiveType " + type);
+ }
+
+ @Override
+ public DataConverter<?, ?> struct(Types.StructType type, DataType dataType, List<DataConverter<?, ?>> converters) {
+ Validate.notNull(type, "Can not create reader for null type");
+ final List<RecordField> recordFields = ((RecordDataType) dataType).getChildSchema().getFields();
+ return new GenericDataConverters.RecordConverter(converters, recordFields, type);
+ }
+
+ @Override
+ public DataConverter<?, ?> list(Types.ListType listTypeInfo, DataType dataType, DataConverter<?, ?> converter) {
+ return new GenericDataConverters.ArrayConverter<>(converter, ((ArrayDataType) dataType).getElementType());
+ }
+
+ @Override
+ public DataConverter<?, ?> map(Types.MapType mapType, DataType dataType, DataConverter<?, ?> keyConverter, DataConverter<?, ?> valueConverter) {
+ return new GenericDataConverters.MapConverter<>(keyConverter, RecordFieldType.STRING.getDataType(), valueConverter, ((MapDataType) dataType).getValueType());
+ }
+ }
+
+ public static class IcebergPartnerAccessors implements SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
+ private final FileFormat fileFormat;
+
+ IcebergPartnerAccessors(FileFormat fileFormat) {
+ this.fileFormat = fileFormat;
+ }
+
+ @Override
+ public DataType fieldPartner(DataType dataType, int fieldId, String name) {
+ Validate.isTrue(dataType instanceof RecordDataType, String.format("Invalid record: %s is not a record", dataType));
+ final RecordDataType recordType = (RecordDataType) dataType;
+ final Optional<RecordField> recordField = recordType.getChildSchema().getField(name);
+
+ Validate.isTrue(recordField.isPresent(), String.format("Cannot find record field with name %s", name));
+ final RecordField field = recordField.get();
+
+ if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
+ return new UUIDDataType(field.getDataType(), fileFormat);
+ }
+
+ return field.getDataType();
+ }
+
+ @Override
+ public DataType mapKeyPartner(DataType dataType) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ @Override
+ public DataType mapValuePartner(DataType dataType) {
+ Validate.isTrue(dataType instanceof MapDataType, String.format("Invalid map: %s is not a map", dataType));
+ final MapDataType mapType = (MapDataType) dataType;
+ return mapType.getValueType();
+ }
+
+ @Override
+ public DataType listElementPartner(DataType dataType) {
+ Validate.isTrue(dataType instanceof ArrayDataType, String.format("Invalid array: %s is not an array", dataType));
+ final ArrayDataType arrayType = (ArrayDataType) dataType;
+ return arrayType.getElementType();
+ }
+ }
+
+ /**
+ * Parquet writer expects the UUID value in different format, so it needs to be converted differently: <a href="https://github.com/apache/iceberg/issues/1881">#1881</a>
+ */
+ private static class UUIDDataType extends DataType {
+
+ private final FileFormat fileFormat;
+
+ UUIDDataType(DataType dataType, FileFormat fileFormat) {
+ super(dataType.getFieldType(), dataType.getFormat());
+ this.fileFormat = fileFormat;
+ }
+
+ public FileFormat getFileFormat() {
+ return fileFormat;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
new file mode 100644
index 0000000000..ca50c49f89
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.ZoneId;
+
+public class RecordFieldGetter {
+
+ /**
+ * Creates an accessor for getting elements in an internal record data structure with the given
+ * field name.
+ *
+ * @param dataType the element type of the field
+ * @param fieldName the name of the field
+ * @param isNullable indicates if the field's value is nullable
+ */
+ public static FieldGetter createFieldGetter(DataType dataType, String fieldName, boolean isNullable) {
+ FieldGetter fieldGetter;
+ switch (dataType.getFieldType()) {
+ case STRING:
+ fieldGetter = record -> record.getAsString(fieldName);
+ break;
+ case CHAR:
+ fieldGetter = record -> DataTypeUtils.toCharacter(record.getValue(fieldName), fieldName);
+ break;
+ case BOOLEAN:
+ fieldGetter = record -> record.getAsBoolean(fieldName);
+ break;
+ case DECIMAL:
+ fieldGetter = record -> DataTypeUtils.toBigDecimal(record.getValue(fieldName), fieldName);
+ break;
+ case BYTE:
+ fieldGetter = record -> DataTypeUtils.toByte(record.getValue(fieldName), fieldName);
+ break;
+ case SHORT:
+ fieldGetter = record -> DataTypeUtils.toShort(record.getValue(fieldName), fieldName);
+ break;
+ case INT:
+ fieldGetter = record -> record.getAsInt(fieldName);
+ break;
+ case DATE:
+ fieldGetter = record -> DataTypeUtils.toLocalDate(record.getValue(fieldName), () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), fieldName);
+ break;
+ case TIME:
+ fieldGetter = record -> DataTypeUtils.toTime(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(dataType.getFormat()), fieldName);
+ break;
+ case LONG:
+ fieldGetter = record -> record.getAsLong(fieldName);
+ break;
+ case BIGINT:
+ fieldGetter = record -> DataTypeUtils.toBigInt(record.getValue(fieldName), fieldName);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getAsFloat(fieldName);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getAsDouble(fieldName);
+ break;
+ case TIMESTAMP:
+ fieldGetter = record -> DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(dataType.getFormat()), fieldName);
+ break;
+ case UUID:
+ fieldGetter = record -> DataTypeUtils.toUUID(record.getValue(fieldName));
+ break;
+ case ARRAY:
+ fieldGetter = record -> DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType) dataType).getElementType());
+ break;
+ case MAP:
+ fieldGetter = record -> DataTypeUtils.toMap(record.getValue(fieldName), fieldName);
+ break;
+ case RECORD:
+ fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema());
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ if (!isNullable) {
+ return fieldGetter;
+ }
+
+ return record -> {
+ if (record.getValue(fieldName) == null) {
+ return null;
+ }
+
+ return fieldGetter.getFieldOrNull(record);
+ };
+ }
+
+ /**
+ * Accessor for getting the field of a record during runtime.
+ */
+
+ public interface FieldGetter extends Serializable {
+ @Nullable
+ Object getFieldOrNull(Record record);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java
new file mode 100644
index 0000000000..fca8b92fc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergPartitionedWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+
+/**
+ * This class adapts {@link Record} for partitioned writing.
+ */
+public class IcebergPartitionedWriter extends PartitionedFanoutWriter<Record> {
+
+ private final PartitionKey partitionKey;
+ private final InternalRecordWrapper wrapper;
+
+ IcebergPartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory,
+ FileIO io, long targetFileSize, Schema schema) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.partitionKey = new PartitionKey(spec, schema);
+ this.wrapper = new InternalRecordWrapper(schema.asStruct());
+ }
+
+ @Override
+ protected PartitionKey partition(Record record) {
+ partitionKey.partition(wrapper.wrap(record));
+ return partitionKey;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java
new file mode 100644
index 0000000000..4dc2bbb6c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/writer/IcebergTaskWriterFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+/**
+ * Factory class to create the suitable {@link TaskWriter} based on the {@link Table}'s properties
+ */
+public class IcebergTaskWriterFactory {
+
+ private final Schema schema;
+ private final PartitionSpec spec;
+ private final FileIO io;
+ private final long targetFileSize;
+ private final FileFormat fileFormat;
+ private final FileAppenderFactory<Record> appenderFactory;
+ private final OutputFileFactory outputFileFactory;
+
+ public IcebergTaskWriterFactory(Table table, long taskId, FileFormat fileFormat, String targetFileSize) {
+ this.schema = table.schema();
+ this.spec = table.spec();
+ this.io = table.io();
+ this.fileFormat = fileFormat;
+
+ this.targetFileSize = targetFileSize != null ? Long.parseLong(targetFileSize) :
+ PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+ this.outputFileFactory = OutputFileFactory.builderFor(table, table.spec().specId(), taskId).format(fileFormat).build();
+ this.appenderFactory = new GenericAppenderFactory(schema, spec);
+ }
+
+ public TaskWriter<Record> create() {
+ if (spec.isUnpartitioned()) {
+ return new UnpartitionedWriter<>(spec, fileFormat, appenderFactory, outputFileFactory, io, targetFileSize);
+ } else {
+ return new IcebergPartitionedWriter(spec, fileFormat, appenderFactory, outputFileFactory, io, targetFileSize, schema);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..f884831b07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.processors.iceberg.PutIceberg
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
new file mode 100644
index 0000000000..0b5403986b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestFileAbort {
+
+ private static final Namespace NAMESPACE = Namespace.of("default");
+ private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort");
+
+ private static final Schema ABORT_SCHEMA = new Schema(
+ Types.NestedField.required(0, "id", Types.IntegerType.get())
+ );
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void abortUncommittedFiles() throws IOException {
+ Table table = initCatalog();
+
+ List<RecordField> recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType()));
+ RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
+
+ List<MapRecord> recordList = new ArrayList<>();
+ recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1)));
+ recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2)));
+ recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3)));
+ recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4)));
+ recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5)));
+
+ IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null);
+ TaskWriter<Record> taskWriter = taskWriterFactory.create();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
+
+ for (MapRecord record : recordList) {
+ taskWriter.write(recordConverter.convert(record));
+ }
+
+ DataFile[] dataFiles = taskWriter.dataFiles();
+
+ // DataFiles written by the taskWriter should exist
+ for (DataFile dataFile : dataFiles) {
+ Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
+ }
+
+ PutIceberg icebergProcessor = new PutIceberg();
+ icebergProcessor.abort(taskWriter.dataFiles(), table);
+
+ // DataFiles shouldn't exist after aborting them
+ for (DataFile dataFile : dataFiles) {
+ Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
+ }
+ }
+
+ private Table initCatalog() throws IOException {
+ TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
+ Catalog catalog = catalogService.getCatalog();
+
+ return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
new file mode 100644
index 0000000000..1308316e16
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.io.File.createTempFile;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestIcebergRecordConverter {
+
+ private OutputFile tempFile;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ tempFile = Files.localOutput(createTempFile("test", null));
+ }
+
+ @AfterEach
+ public void tearDown() {
+ File file = new File(tempFile.location());
+ file.deleteOnExit();
+ }
+
+ private static final Schema STRUCT = new Schema(
+ Types.NestedField.required(0, "struct", Types.StructType.of(
+ Types.NestedField.required(1, "nested_struct", Types.StructType.of(
+ Types.NestedField.required(2, "string", Types.StringType.get()),
+ Types.NestedField.required(3, "integer", Types.IntegerType.get()))
+ )
+ ))
+ );
+
+ private static final Schema LIST = new Schema(
+ Types.NestedField.required(0, "list", Types.ListType.ofRequired(
+ 1, Types.ListType.ofRequired(
+ 2, Types.StringType.get())
+ )
+ )
+ );
+
+ private static final Schema MAP = new Schema(
+ Types.NestedField.required(0, "map", Types.MapType.ofRequired(
+ 1, 2, Types.StringType.get(), Types.MapType.ofRequired(
+ 3, 4, Types.StringType.get(), Types.LongType.get()
+ )
+ ))
+ );
+
+ private static final Schema PRIMITIVES = new Schema(
+ Types.NestedField.optional(0, "string", Types.StringType.get()),
+ Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "float", Types.FloatType.get()),
+ Types.NestedField.optional(3, "long", Types.LongType.get()),
+ Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+ Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
+ Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
+ Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
+ Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+ Types.NestedField.optional(9, "date", Types.DateType.get()),
+ Types.NestedField.optional(10, "time", Types.TimeType.get()),
+ Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
+ Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(13, "uuid", Types.UUIDType.get())
+ );
+
+ private static RecordSchema getStructSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("struct", new RecordDataType(getNestedStructSchema())));
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static RecordSchema getNestedStructSchema() {
+ List<RecordField> nestedFields = new ArrayList<>();
+ nestedFields.add(new RecordField("nested_struct", new RecordDataType(getNestedStructSchema2())));
+
+ return new SimpleRecordSchema(nestedFields);
+ }
+
+ private static RecordSchema getNestedStructSchema2() {
+ List<RecordField> nestedFields2 = new ArrayList<>();
+ nestedFields2.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+ nestedFields2.add(new RecordField("integer", RecordFieldType.INT.getDataType()));
+
+ return new SimpleRecordSchema(nestedFields2);
+ }
+
+ private static RecordSchema getListSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("list", new ArrayDataType(
+ new ArrayDataType(RecordFieldType.STRING.getDataType()))));
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static RecordSchema getMapSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("map", new MapDataType(
+ new MapDataType(RecordFieldType.LONG.getDataType()))));
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static RecordSchema getPrimitivesSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("integer", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+ fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+ fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
+ fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+ fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static Record setupStructTestRecord() {
+ Map<String, Object> nestedValues2 = new HashMap<>();
+ nestedValues2.put("string", "Test String");
+ nestedValues2.put("integer", 10);
+ MapRecord nestedRecord2 = new MapRecord(getNestedStructSchema2(), nestedValues2);
+
+ Map<String, Object> nestedValues = new HashMap<>();
+ nestedValues.put("nested_struct", nestedRecord2);
+ MapRecord nestedRecord = new MapRecord(getNestedStructSchema(), nestedValues);
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("struct", nestedRecord);
+ return new MapRecord(getStructSchema(), values);
+ }
+
+ private static Record setupListTestRecord() {
+ List<String> nestedList = new ArrayList<>();
+ nestedList.add("Test String");
+
+ List<Collection> list = new ArrayList<>();
+ list.add(nestedList);
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("list", list);
+
+ return new MapRecord(getListSchema(), values);
+ }
+
+ private static Record setupMapTestRecord() {
+ Map<String, Long> nestedMap = new HashMap<>();
+ nestedMap.put("nested_key", 42L);
+
+ Map<String, Map> map = new HashMap<>();
+ map.put("key", nestedMap);
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("map", map);
+
+ return new MapRecord(getMapSchema(), values);
+ }
+
+ private static Record setupPrimitivesTestRecord(RecordSchema schema) {
+ LocalDate localDate = LocalDate.of(2017, 4, 4);
+ LocalTime localTime = LocalTime.of(14, 20, 33);
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("string", "Test String");
+ values.put("integer", 8);
+ values.put("float", 1.23456F);
+ values.put("long", 42L);
+ values.put("double", 3.14159D);
+ values.put("decimal", new BigDecimal("12345678.12"));
+ values.put("boolean", true);
+ values.put("fixed", "hello".getBytes());
+ values.put("binary", "hello".getBytes());
+ values.put("date", localDate);
+ values.put("time", Time.valueOf(localTime));
+ values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
+ values.put("timestampTz", Timestamp.valueOf(localDateTime));
+ values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+
+ return new MapRecord(schema, values);
+ }
+
+ @Test
+ public void testPrimitivesAvro() throws IOException {
+ RecordSchema nifiSchema = getPrimitivesSchema();
+ Record record = setupPrimitivesTestRecord(nifiSchema);
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToAvro(PRIMITIVES, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromAvro(PRIMITIVES, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+ Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+ Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+ Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+ Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+ Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+ Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+ Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+ Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+ Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+ Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+ Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testPrimitivesOrc() throws IOException {
+ RecordSchema nifiSchema = getPrimitivesSchema();
+ Record record = setupPrimitivesTestRecord(nifiSchema);
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToOrc(PRIMITIVES, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromOrc(PRIMITIVES, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+ Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+ Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+ Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+ Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+ Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+ Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+ Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+ Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+ Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+ Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+ Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+ }
+
+ @Test
+ public void testPrimitivesParquet() throws IOException {
+ RecordSchema nifiSchema = getPrimitivesSchema();
+ Record record = setupPrimitivesTestRecord(nifiSchema);
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToParquet(PRIMITIVES, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromParquet(PRIMITIVES, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
+
+ Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
+ Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
+ Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
+ Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
+ Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
+ Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
+ Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
+ Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
+ Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
+ Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
+ Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
+ Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+ }
+
+ @Test
+ public void testStructAvro() throws IOException {
+ RecordSchema nifiSchema = getStructSchema();
+ Record record = setupStructTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.AVRO);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToAvro(STRUCT, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromAvro(STRUCT, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+ GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+ GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+ Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testStructOrc() throws IOException {
+ RecordSchema nifiSchema = getStructSchema();
+ Record record = setupStructTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.ORC);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToOrc(STRUCT, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromOrc(STRUCT, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+ GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+ GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+ Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+ }
+
+ @Test
+ public void testStructParquet() throws IOException {
+ RecordSchema nifiSchema = getStructSchema();
+ Record record = setupStructTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.PARQUET);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToParquet(STRUCT, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromParquet(STRUCT, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
+ GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedRecord.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
+ GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
+
+ Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
+ Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
+ }
+
+ @Test
+ public void testListAvro() throws IOException {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToAvro(LIST, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromAvro(LIST, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+ List nestedList = (List) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedList.size(), 1);
+ Assertions.assertInstanceOf(List.class, nestedList.get(0));
+ List baseList = (List) nestedList.get(0);
+
+ Assertions.assertEquals(baseList.get(0), "Test String");
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testListOrc() throws IOException {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToOrc(LIST, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromOrc(LIST, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+ List nestedList = (List) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedList.size(), 1);
+ Assertions.assertInstanceOf(List.class, nestedList.get(0));
+ List baseList = (List) nestedList.get(0);
+
+ Assertions.assertEquals(baseList.get(0), "Test String");
+ }
+
+ @Test
+ public void testListParquet() throws IOException {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToParquet(LIST, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromParquet(LIST, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(List.class, resultRecord.get(0));
+ List nestedList = (List) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedList.size(), 1);
+ Assertions.assertInstanceOf(List.class, nestedList.get(0));
+ List baseList = (List) nestedList.get(0);
+
+ Assertions.assertEquals(baseList.get(0), "Test String");
+ }
+
+ @Test
+ public void testMapAvro() throws IOException {
+ RecordSchema nifiSchema = getMapSchema();
+ Record record = setupMapTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.AVRO);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToAvro(MAP, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromAvro(MAP, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+ Map nestedMap = (Map) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedMap.size(), 1);
+ Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+ Map baseMap = (Map) nestedMap.get("key");
+
+ Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testMapOrc() throws IOException {
+ RecordSchema nifiSchema = getMapSchema();
+ Record record = setupMapTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.ORC);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToOrc(MAP, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromOrc(MAP, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+ Map nestedMap = (Map) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedMap.size(), 1);
+ Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+ Map baseMap = (Map) nestedMap.get("key");
+
+ Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+ }
+
+ @Test
+ public void testMapParquet() throws IOException {
+ RecordSchema nifiSchema = getMapSchema();
+ Record record = setupMapTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.PARQUET);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeToParquet(MAP, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFromParquet(MAP, tempFile.toInputFile());
+
+ Assertions.assertEquals(results.size(), 1);
+ Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
+ GenericRecord resultRecord = results.get(0);
+
+ Assertions.assertEquals(resultRecord.size(), 1);
+ Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
+ Map nestedMap = (Map) resultRecord.get(0);
+
+ Assertions.assertEquals(nestedMap.size(), 1);
+ Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
+ Map baseMap = (Map) nestedMap.get("key");
+
+ Assertions.assertEquals(baseMap.get("nested_key"), 42L);
+ }
+
+ @Test
+ public void testSchemaMismatchAvro() {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ DataFileWriter.AppendWriteException e = assertThrows(DataFileWriter.AppendWriteException.class, () -> writeToAvro(STRUCT, genericRecord, tempFile));
+ assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"), e.getMessage());
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testSchemaMismatchOrc() {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ ClassCastException e = assertThrows(ClassCastException.class, () -> writeToOrc(STRUCT, genericRecord, tempFile));
+ assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
+ }
+
+ @Test
+ public void testSchemaMismatchParquet() {
+ RecordSchema nifiSchema = getListSchema();
+ Record record = setupListTestRecord();
+
+ IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ ClassCastException e = assertThrows(ClassCastException.class, () -> writeToParquet(STRUCT, genericRecord, tempFile));
+ assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
+ }
+
+ public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+ try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
+ .schema(schema)
+ .createWriterFunc(DataWriter::create)
+ .overwrite()
+ .build()) {
+ appender.add(record);
+ }
+ }
+
+ public ArrayList<GenericRecord> readFromAvro(Schema schema, InputFile inputFile) throws IOException {
+ try (AvroIterable<GenericRecord> reader = Avro.read(inputFile)
+ .project(schema)
+ .createReaderFunc(DataReader::create)
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ public void writeToOrc(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+ try (FileAppender<GenericRecord> appender = ORC.write(outputFile)
+ .schema(schema)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .overwrite()
+ .build()) {
+ appender.add(record);
+ }
+ }
+
+ public ArrayList<GenericRecord> readFromOrc(Schema schema, InputFile inputFile) throws IOException {
+ try (CloseableIterable<GenericRecord> reader = ORC.read(inputFile)
+ .project(schema)
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ public void writeToParquet(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
+ try (FileAppender<GenericRecord> appender = Parquet.write(outputFile)
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .build()) {
+ appender.add(record);
+ }
+ }
+
+ public ArrayList<GenericRecord> readFromParquet(Schema schema, InputFile inputFile) throws IOException {
+ try (CloseableIterable<GenericRecord> reader = Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
new file mode 100644
index 0000000000..9b75ee9ef6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+ private TestRunner runner;
+ private PutIceberg processor;
+ private Schema inputSchema;
+
+ private static final Namespace NAMESPACE = Namespace.of("default");
+
+ private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "date");
+
+ private static final org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+ Types.NestedField.required(2, "timestampMicros", Types.TimestampType.withZone()),
+ Types.NestedField.required(3, "date", Types.DateType.get())
+ );
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8);
+ inputSchema = new Schema.Parser().parse(avroSchema);
+
+ processor = new PutIceberg();
+ }
+
+ private void initRecordReader() throws InitializationException {
+ MockRecordParser readerFactory = new MockRecordParser();
+ RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
+
+ for (RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2015-02-02 15:30:30.800"), Date.valueOf("2015-02-02"));
+ readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2015-02-02 15:30:30.800"), Date.valueOf("2015-02-02"));
+ readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2016-01-02 15:30:30.800"), Date.valueOf("2016-01-02"));
+ readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2017-01-10 15:30:30.800"), Date.valueOf("2017-01-10"));
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
+ }
+
+ private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException {
+ TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
+ Catalog catalog = catalogService.getCatalog();
+
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+ tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+
+ catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, tableProperties);
+
+ runner.addControllerService("catalog-service", catalogService);
+ runner.enableControllerService(catalogService);
+
+ runner.setProperty(PutIceberg.CATALOG, "catalog-service");
+
+ return catalog;
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerYearTransform(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+ .year("date")
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+ runner.setProperty(PutIceberg.TABLE_NAME, "date");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateNumberOfDataFiles(table.location(), 3);
+ validatePartitionFolders(table.location(), Arrays.asList("date_year=2015", "date_year=2016", "date_year=2017"));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerMonthTransform(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+ .month("timestampMicros")
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+ runner.setProperty(PutIceberg.TABLE_NAME, "date");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateNumberOfDataFiles(table.location(), 3);
+ validatePartitionFolders(table.location(), Arrays.asList(
+ "timestampMicros_month=2015-02", "timestampMicros_month=2016-01", "timestampMicros_month=2017-01"));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerDayTransform(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
+ .day("timestampMicros")
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
+ runner.setProperty(PutIceberg.TABLE_NAME, "date");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateNumberOfDataFiles(table.location(), 3);
+ validatePartitionFolders(table.location(), Arrays.asList(
+ "timestampMicros_day=2015-02-02", "timestampMicros_day=2016-01-02", "timestampMicros_day=2017-01-10"));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
new file mode 100644
index 0000000000..1f7a79258b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
+import org.apache.nifi.processors.iceberg.metastore.ThriftMetastore;
+import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+
+public class TestPutIcebergWithHiveCatalog {
+
+ private TestRunner runner;
+ private PutIceberg processor;
+ private Schema inputSchema;
+
+ @RegisterExtension
+ public ThriftMetastore metastore = new ThriftMetastore();
+
+ private static final Namespace NAMESPACE = Namespace.of("test_metastore");
+
+ private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "users");
+
+ private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()),
+ Types.NestedField.required(3, "department", Types.StringType.get())
+ );
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/user.avsc")), StandardCharsets.UTF_8);
+ inputSchema = new Schema.Parser().parse(avroSchema);
+
+ processor = new PutIceberg();
+ }
+
+ private void initRecordReader() throws InitializationException {
+ MockRecordParser readerFactory = new MockRecordParser();
+ RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
+
+ for (RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(0, "John", "Finance");
+ readerFactory.addRecord(1, "Jill", "Finance");
+ readerFactory.addRecord(2, "James", "Marketing");
+ readerFactory.addRecord(3, "Joana", "Sales");
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
+ }
+
+ private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException {
+ TestHiveCatalogService catalogService = new TestHiveCatalogService(metastore.getThriftConnectionUri(), metastore.getWarehouseLocation());
+ Catalog catalog = catalogService.getCatalog();
+
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+ tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+
+ catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties);
+
+ runner.addControllerService("catalog-service", catalogService);
+ runner.enableControllerService(catalogService);
+
+ runner.setProperty(PutIceberg.CATALOG, "catalog-service");
+
+ return catalog;
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerPartitioned(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+ .bucket("department", 3)
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+ runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+ .add(0, "John", "Finance")
+ .add(1, "Jill", "Finance")
+ .add(2, "James", "Marketing")
+ .add(3, "Joana", "Sales")
+ .build();
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ String tableLocation = new URI(table.location()).getPath();
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateData(table, expectedRecords, 0);
+ validateNumberOfDataFiles(tableLocation, 3);
+ validatePartitionFolders(tableLocation, Arrays.asList(
+ "department_bucket=0", "department_bucket=1", "department_bucket=2"));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerIdentityPartitioned(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+ .identity("department")
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+ runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+ .add(0, "John", "Finance")
+ .add(1, "Jill", "Finance")
+ .add(2, "James", "Marketing")
+ .add(3, "Joana", "Sales")
+ .build();
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ String tableLocation = new URI(table.location()).getPath();
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateData(table, expectedRecords, 0);
+ validateNumberOfDataFiles(tableLocation, 3);
+ validatePartitionFolders(tableLocation, Arrays.asList(
+ "department=Finance", "department=Marketing", "department=Sales"));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception {
+ PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
+ .identity("name")
+ .identity("department")
+ .build();
+
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+ runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+ .add(0, "John", "Finance")
+ .add(1, "Jill", "Finance")
+ .add(2, "James", "Marketing")
+ .add(3, "Joana", "Sales")
+ .build();
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ String tableLocation = new URI(table.location()).getPath();
+ Assertions.assertTrue(table.spec().isPartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateData(table, expectedRecords, 0);
+ validateNumberOfDataFiles(tableLocation, 4);
+ validatePartitionFolders(tableLocation, Arrays.asList(
+ "name=James/department=Marketing/",
+ "name=Jill/department=Finance/",
+ "name=Joana/department=Sales/",
+ "name=John/department=Finance/"
+ ));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "orc", "parquet"})
+ public void onTriggerUnPartitioned(String fileFormat) throws Exception {
+ runner = TestRunners.newTestRunner(processor);
+ initRecordReader();
+ Catalog catalog = initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
+ runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
+ .add(0, "John", "Finance")
+ .add(1, "Jill", "Finance")
+ .add(2, "James", "Marketing")
+ .add(3, "Joana", "Sales")
+ .build();
+
+ runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
+
+ Assertions.assertTrue(table.spec().isUnpartitioned());
+ Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ validateData(table, expectedRecords, 0);
+ validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
new file mode 100644
index 0000000000..d080067216
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.File;
+import java.io.IOException;
+
+import static java.nio.file.Files.createTempDirectory;
+
+public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+ private final Catalog catalog;
+
+ public TestHadoopCatalogService() throws IOException {
+ File warehouseLocation = createTempDirectory("metastore").toFile();
+
+ catalog = new HadoopCatalog(new Configuration(), warehouseLocation.getAbsolutePath());
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
new file mode 100644
index 0000000000..a0a5bf441e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+ private Catalog catalog;
+
+ public TestHiveCatalogService(String metastoreUri, String warehouseLocation) {
+ initCatalog(metastoreUri, warehouseLocation);
+ }
+
+ public void initCatalog(String metastoreUri, String warehouseLocation) {
+ catalog = new HiveCatalog();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CatalogProperties.URI, metastoreUri);
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+
+ catalog.initialize("hive-catalog", properties);
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java
new file mode 100644
index 0000000000..dc917a358c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import org.apache.derby.jdbc.EmbeddedDriver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
+import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
+import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.nio.file.Files.createTempDirectory;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.AUTO_CREATE_ALL;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECTION_DRIVER;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECT_URL_KEY;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_TXN_MANAGER;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HMS_HANDLER_FORCE_RELOAD_CONF;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.SCHEMA_VERIFICATION;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE;
+
+/** This class wraps Metastore service core functionalities. */
+class MetastoreCore {
+
+ private final String DB_NAME = "test_metastore";
+
+ private String thriftConnectionUri;
+ private Configuration hiveConf;
+ private HiveMetaStoreClient metastoreClient;
+ private File tempDir;
+ private ExecutorService thriftServer;
+ private TServer server;
+
+ public void initialize() throws IOException, TException, InvocationTargetException, NoSuchMethodException,
+ IllegalAccessException, NoSuchFieldException, SQLException {
+ thriftServer = Executors.newSingleThreadExecutor();
+ tempDir = createTempDirectory("metastore").toFile();
+ setDerbyLogPath();
+ setupDB("jdbc:derby:" + getDerbyPath() + ";create=true");
+
+ server = thriftServer();
+ thriftServer.submit(() -> server.serve());
+
+ metastoreClient = new HiveMetaStoreClient(hiveConf);
+ metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>()));
+ }
+
+ public void shutdown() {
+
+ metastoreClient.close();
+
+ if (server != null) {
+ server.stop();
+ }
+
+ thriftServer.shutdown();
+
+ if (tempDir != null) {
+ tempDir.delete();
+ }
+ }
+
+ private HiveConf hiveConf(int port) {
+ thriftConnectionUri = "thrift://localhost:" + port;
+
+ final HiveConf hiveConf = new HiveConf(new Configuration(), this.getClass());
+ hiveConf.set(THRIFT_URIS.getVarname(), thriftConnectionUri);
+ hiveConf.set(WAREHOUSE.getVarname(), "file:" + tempDir.getAbsolutePath());
+ hiveConf.set(WAREHOUSE.getHiveName(), "file:" + tempDir.getAbsolutePath());
+ hiveConf.set(CONNECTION_DRIVER.getVarname(), EmbeddedDriver.class.getName());
+ hiveConf.set(CONNECT_URL_KEY.getVarname(), "jdbc:derby:" + getDerbyPath() + ";create=true");
+ hiveConf.set(AUTO_CREATE_ALL.getVarname(), "false");
+ hiveConf.set(SCHEMA_VERIFICATION.getVarname(), "false");
+ hiveConf.set(HIVE_TXN_MANAGER.getVarname(), "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+ hiveConf.set(COMPACTOR_INITIATOR_ON.getVarname(), "true");
+ hiveConf.set(COMPACTOR_WORKER_THREADS.getVarname(), "1");
+ hiveConf.set(HIVE_SUPPORT_CONCURRENCY.getVarname(), "true");
+ hiveConf.setBoolean("hcatalog.hive.client.cache.disabled", true);
+
+
+ hiveConf.set(CONNECTION_POOLING_TYPE.getVarname(), "NONE");
+ hiveConf.set(HMS_HANDLER_FORCE_RELOAD_CONF.getVarname(), "true");
+
+ return hiveConf;
+ }
+
+ private void setDerbyLogPath() throws IOException {
+ final String derbyLog = Files.createTempFile(tempDir.toPath(), "derby", ".log").toString();
+ System.setProperty("derby.stream.error.file", derbyLog);
+ }
+
+ private String getDerbyPath() {
+ return new File(tempDir, "metastore_db").getPath();
+ }
+
+ private TServer thriftServer() throws TTransportException, MetaException, InvocationTargetException,
+ NoSuchMethodException, IllegalAccessException, NoSuchFieldException {
+ final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new TServerSocket(0));
+ hiveConf = hiveConf(socket.getServerSocket().getLocalPort());
+ final HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", hiveConf);
+ final IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler, true);
+ final TTransportFactory transportFactory = new TTransportFactory();
+ final TSetIpAddressProcessor<IHMSHandler> processor = new TSetIpAddressProcessor<>(handler);
+
+ TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
+ .processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .minWorkerThreads(3)
+ .maxWorkerThreads(5);
+
+ return new TThreadPoolServer(args);
+ }
+
+ private void setupDB(String dbURL) throws SQLException, IOException {
+ final Connection connection = DriverManager.getConnection(dbURL);
+ ScriptRunner scriptRunner = new ScriptRunner(connection);
+
+ final URL initScript = getClass().getClassLoader().getResource("hive-schema-3.2.0.derby.sql");
+ final Reader reader = new BufferedReader(new FileReader(initScript.getFile()));
+ scriptRunner.runScript(reader);
+ }
+
+ private String getDBPath() {
+ return Paths.get(tempDir.getAbsolutePath(), DB_NAME + ".db").toAbsolutePath().toString();
+ }
+
+ public String getThriftConnectionUri() {
+ return thriftConnectionUri;
+ }
+
+ public String getWarehouseLocation() {
+ return tempDir.getAbsolutePath();
+ }
+
+}
+
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java
new file mode 100644
index 0000000000..d1e7e1808e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** This class is responsible for metastore init script processing and execution. */
+public class ScriptRunner {
+
+ private static final String DEFAULT_DELIMITER = ";";
+
+ private final Connection connection;
+
+ public ScriptRunner(Connection connection) throws SQLException {
+ this.connection = connection;
+ this.connection.setAutoCommit(true);
+ }
+
+ public void runScript(Reader reader) throws IOException, SQLException {
+ try {
+ StringBuilder command = new StringBuilder();
+ LineNumberReader lineReader = new LineNumberReader(reader);
+ String line;
+ while ((line = lineReader.readLine()) != null) {
+ String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty() || trimmedLine.startsWith("--") || trimmedLine.startsWith("//")) {
+ continue; //Skip comment line
+ } else if (trimmedLine.endsWith(getDelimiter())) {
+ command.append(line, 0, line.lastIndexOf(getDelimiter()));
+ command.append(" ");
+ Statement statement = connection.createStatement();
+
+ statement.execute(command.toString());
+ connection.commit();
+
+ command = new StringBuilder();
+
+ statement.close();
+ } else {
+ command.append(line);
+ command.append(" ");
+ }
+ }
+ } catch (IOException | SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Error running metastore init script.", e);
+ } finally {
+ connection.rollback();
+ }
+ }
+
+ private String getDelimiter() {
+ return DEFAULT_DELIMITER;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java
new file mode 100644
index 0000000000..910c2df176
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.metastore;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */
+public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
+
+ private final MetastoreCore metastoreCore;
+
+ public ThriftMetastore() {
+ metastoreCore = new MetastoreCore();
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ metastoreCore.initialize();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ metastoreCore.shutdown();
+ }
+
+ public String getThriftConnectionUri() {
+ return metastoreCore.getThriftConnectionUri();
+ }
+
+ public String getWarehouseLocation() {
+ return metastoreCore.getWarehouseLocation();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java
new file mode 100644
index 0000000000..a70e368033
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.util;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IcebergTestUtils {
+
+ /**
+ * Validates whether the table contains the expected records. The results should be sorted by a unique key, so we do not end up with flaky tests.
+ *
+ * @param table The table we should read the records from
+ * @param expected The expected list of Records
+ * @param sortBy The column position by which we will sort
+ * @throws IOException Exceptions when reading the table data
+ */
+ public static void validateData(Table table, List<Record> expected, int sortBy) throws IOException {
+ List<Record> records = Lists.newArrayListWithExpectedSize(expected.size());
+ try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+ iterable.forEach(records::add);
+ }
+
+ validateData(expected, records, sortBy);
+ }
+
+ /**
+ * Validates whether the 2 sets of records are the same. The results should be sorted by a unique key, so we do not end up with flaky tests.
+ *
+ * @param expected The expected list of Records
+ * @param actual The actual list of Records
+ * @param sortBy The column position by which we will sort
+ */
+ public static void validateData(List<Record> expected, List<Record> actual, int sortBy) {
+ List<Record> sortedExpected = Lists.newArrayList(expected);
+ List<Record> sortedActual = Lists.newArrayList(actual);
+ // Sort based on the specified field
+ sortedExpected.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+ sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < expected.size(); ++i) {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i));
+ }
+ }
+
+ /**
+ * Validates the number of files under a {@link Table}
+ *
+ * @param tableLocation The location of table we are checking
+ * @param numberOfDataFiles The expected number of data files (TABLE_LOCATION/data/*)
+ */
+ public static void validateNumberOfDataFiles(String tableLocation, int numberOfDataFiles) throws IOException {
+ List<Path> dataFiles = Files.walk(Paths.get(tableLocation + "/data"))
+ .filter(Files::isRegularFile)
+ .filter(path -> !path.getFileName().toString().startsWith("."))
+ .collect(Collectors.toList());
+
+ assertEquals(numberOfDataFiles, dataFiles.size());
+ }
+
+ public static void validatePartitionFolders(String tableLocation, List<String> partitionPaths) {
+ for (String partitionPath : partitionPaths) {
+ Path path = Paths.get(tableLocation + "/data/" + partitionPath);
+ assertTrue("The expected path doesn't exists: " + path, Files.exists(path));
+ }
+ }
+
+ public static class RecordsBuilder {
+
+ private final List<Record> records = new ArrayList<>();
+ private final Schema schema;
+
+ private RecordsBuilder(Schema schema) {
+ this.schema = schema;
+ }
+
+ public RecordsBuilder add(Object... values) {
+ Validate.isTrue(schema.columns().size() == values.length, "Number of provided values and schema length should be equal.");
+
+ GenericRecord record = GenericRecord.create(schema);
+
+ for (int i = 0; i < values.length; i++) {
+ record.set(i, values[i]);
+ }
+
+ records.add(record);
+ return this;
+ }
+
+ public List<Record> build() {
+ return Collections.unmodifiableList(records);
+ }
+
+ public static RecordsBuilder newInstance(Schema schema) {
+ return new RecordsBuilder(schema);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc
new file mode 100644
index 0000000000..6fc6010b78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/date.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name" : "timeMicros",
+ "type": {
+ "type": "long",
+ "logicalType": "time-micros"
+ }
+ },
+ {
+ "name" : "timestampMicros",
+ "type": {
+ "type" : "long",
+ "logicalType" : "timestamp-micros"
+ }
+ },
+ {
+ "name" : "date",
+ "type": {
+ "type" : "int",
+ "logicalType" : "date"
+ }
+ }
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql
new file mode 100644
index 0000000000..cbc7c935ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-3.2.0.derby.sql
@@ -0,0 +1,738 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- Timestamp: 2011-09-22 15:32:02.024
+-- Source database is: /home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Connection URL is: jdbc:derby:/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Specified schema is: APP
+-- appendLogs: false
+
+-- ----------------------------------------------
+-- DDL Statements for functions
+-- ----------------------------------------------
+
+CREATE FUNCTION "APP"."NUCLEUS_ASCII" (C CHAR(1)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.ascii' ;
+
+CREATE FUNCTION "APP"."NUCLEUS_MATCHES" (TEXT VARCHAR(8000),PATTERN VARCHAR(8000)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.matches' ;
+
+-- ----------------------------------------------
+-- DDL Statements for tables
+-- ----------------------------------------------
+CREATE TABLE "APP"."DBS" (
+ "DB_ID" BIGINT NOT NULL,
+ "DESC" VARCHAR(4000),
+ "DB_LOCATION_URI" VARCHAR(4000) NOT NULL,
+ "NAME" VARCHAR(128),
+ "OWNER_NAME" VARCHAR(128),
+ "OWNER_TYPE" VARCHAR(10),
+ "CTLG_NAME" VARCHAR(256) NOT NULL DEFAULT 'hive',
+ "CREATE_TIME" INTEGER
+);
+
+CREATE TABLE "APP"."TBL_PRIVS" ("TBL_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."TBL_COL_PRIVS" ("TBL_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "TBL_COL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."CDS" ("CD_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."PARTITION_KEY_VALS" ("PART_ID" BIGINT NOT NULL, "PART_KEY_VAL" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."DB_PRIVS" ("DB_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "DB_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."IDXS" ("INDEX_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DEFERRED_REBUILD" CHAR(1) NOT NULL, "INDEX_HANDLER_CLASS" VARCHAR(4000), "INDEX_NAME" VARCHAR(128), "INDEX_TBL_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "ORIG_TBL_ID" BIGINT, "SD_ID" BIGINT);
+
+CREATE TABLE "APP"."INDEX_PARAMS" ("INDEX_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."PARTITIONS" ("PART_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "LAST_ACCESS_TIME" INTEGER NOT NULL, "PART_NAME" VARCHAR(767), "SD_ID" BIGINT, "TBL_ID" BIGINT);
+
+CREATE TABLE "APP"."SERDES" ("SERDE_ID" BIGINT NOT NULL, "NAME" VARCHAR(128), "SLIB" VARCHAR(4000), "DESCRIPTION" VARCHAR(4000), "SERIALIZER_CLASS" VARCHAR(4000), "DESERIALIZER_CLASS" VARCHAR(4000), SERDE_TYPE INTEGER);
+
+CREATE TABLE "APP"."PART_PRIVS" ("PART_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PART_ID" BIGINT, "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "PART_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."ROLE_MAP" ("ROLE_GRANT_ID" BIGINT NOT NULL, "ADD_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "ROLE_ID" BIGINT);
+
+CREATE TABLE "APP"."TYPES" ("TYPES_ID" BIGINT NOT NULL, "TYPE_NAME" VARCHAR(128), "TYPE1" VARCHAR(767), "TYPE2" VARCHAR(767));
+
+CREATE TABLE "APP"."GLOBAL_PRIVS" ("USER_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "USER_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."PARTITION_PARAMS" ("PART_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."PARTITION_EVENTS" (
+ "PART_NAME_ID" BIGINT NOT NULL,
+ "CAT_NAME" VARCHAR(256),
+ "DB_NAME" VARCHAR(128),
+ "EVENT_TIME" BIGINT NOT NULL,
+ "EVENT_TYPE" INTEGER NOT NULL,
+ "PARTITION_NAME" VARCHAR(767),
+ "TBL_NAME" VARCHAR(256)
+);
+
+CREATE TABLE "APP"."COLUMNS" ("SD_ID" BIGINT NOT NULL, "COMMENT" VARCHAR(256), "COLUMN_NAME" VARCHAR(128) NOT NULL, "TYPE_NAME" VARCHAR(4000) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."ROLES" ("ROLE_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "OWNER_NAME" VARCHAR(128), "ROLE_NAME" VARCHAR(128));
+
+CREATE TABLE "APP"."TBLS" ("TBL_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "OWNER" VARCHAR(767), "OWNER_TYPE" VARCHAR(10), "RETENTION" INTEGER NOT NULL, "SD_ID" BIGINT, "TBL_NAME" VARCHAR(256), "TBL_TYPE" VARCHAR(128), "VIEW_EXPANDED_TEXT" LONG VARCHAR, "VIEW_ORIGINAL_TEXT" LONG VARCHAR, "IS_REWRITE_ENABLED" CHAR(1) NOT NULL DEFAULT 'N');
+
+CREATE TABLE "APP"."PARTITION_KEYS" ("TBL_ID" BIGINT NOT NULL, "PKEY_COMMENT" VARCHAR(4000), "PKEY_NAME" VARCHAR(128) NOT NULL, "PKEY_TYPE" VARCHAR(767) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."PART_COL_PRIVS" ("PART_COLUMN_GRANT_ID" BIGINT NOT NULL, "COLUMN_NAME" VARCHAR(767), "CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PART_ID" BIGINT, "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" VARCHAR(128), "PART_COL_PRIV" VARCHAR(128), "AUTHORIZER" VARCHAR(128));
+
+CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, "INPUT_FORMAT" VARCHAR(4000), "IS_COMPRESSED" CHAR(1) NOT NULL, "LOCATION" VARCHAR(4000), "NUM_BUCKETS" INTEGER NOT NULL, "OUTPUT_FORMAT" VARCHAR(4000), "SERDE_ID" BIGINT, "CD_ID" BIGINT, "IS_STOREDASSUBDIRECTORIES" CHAR(1) NOT NULL);
+
+CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL, "NEXT_VAL" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."TAB_COL_STATS"(
+ "CAT_NAME" VARCHAR(256) NOT NULL,
+ "DB_NAME" VARCHAR(128) NOT NULL,
+ "TABLE_NAME" VARCHAR(256) NOT NULL,
+ "COLUMN_NAME" VARCHAR(767) NOT NULL,
+ "COLUMN_TYPE" VARCHAR(128) NOT NULL,
+ "LONG_LOW_VALUE" BIGINT,
+ "LONG_HIGH_VALUE" BIGINT,
+ "DOUBLE_LOW_VALUE" DOUBLE,
+ "DOUBLE_HIGH_VALUE" DOUBLE,
+ "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000),
+ "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000),
+ "NUM_DISTINCTS" BIGINT,
+ "NUM_NULLS" BIGINT NOT NULL,
+ "AVG_COL_LEN" DOUBLE,
+ "MAX_COL_LEN" BIGINT,
+ "NUM_TRUES" BIGINT,
+ "NUM_FALSES" BIGINT,
+ "LAST_ANALYZED" BIGINT,
+ "CS_ID" BIGINT NOT NULL,
+ "TBL_ID" BIGINT NOT NULL,
+ "BIT_VECTOR" BLOB
+);
+
+CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."TYPE_FIELDS" ("TYPE_NAME" BIGINT NOT NULL, "COMMENT" VARCHAR(256), "FIELD_NAME" VARCHAR(128) NOT NULL, "FIELD_TYPE" VARCHAR(767) NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."NUCLEUS_TABLES" ("CLASS_NAME" VARCHAR(128) NOT NULL, "TABLE_NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(4) NOT NULL, "OWNER" VARCHAR(2) NOT NULL, "VERSION" VARCHAR(20) NOT NULL, "INTERFACE_NAME" VARCHAR(256) DEFAULT NULL);
+
+CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST_VALUES" ("STRING_LIST_ID" BIGINT NOT NULL, "STRING_LIST_VALUE" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_NAMES" ("SD_ID" BIGINT NOT NULL, "SKEWED_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ("SD_ID" BIGINT NOT NULL, "STRING_LIST_ID_KID" BIGINT NOT NULL, "LOCATION" VARCHAR(4000));
+
+CREATE TABLE "APP"."SKEWED_VALUES" ("SD_ID_OID" BIGINT NOT NULL, "STRING_LIST_ID_EID" BIGINT NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."MASTER_KEYS" ("KEY_ID" INTEGER NOT NULL generated always as identity (start with 1), "MASTER_KEY" VARCHAR(767));
+
+CREATE TABLE "APP"."DELEGATION_TOKENS" ( "TOKEN_IDENT" VARCHAR(767) NOT NULL, "TOKEN" VARCHAR(767));
+
+CREATE TABLE "APP"."PART_COL_STATS"(
+ "CAT_NAME" VARCHAR(256) NOT NULL,
+ "DB_NAME" VARCHAR(128) NOT NULL,
+ "TABLE_NAME" VARCHAR(256) NOT NULL,
+ "PARTITION_NAME" VARCHAR(767) NOT NULL,
+ "COLUMN_NAME" VARCHAR(767) NOT NULL,
+ "COLUMN_TYPE" VARCHAR(128) NOT NULL,
+ "LONG_LOW_VALUE" BIGINT,
+ "LONG_HIGH_VALUE" BIGINT,
+ "DOUBLE_LOW_VALUE" DOUBLE,
+ "DOUBLE_HIGH_VALUE" DOUBLE,
+ "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000),
+ "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000),
+ "NUM_DISTINCTS" BIGINT,
+ "BIT_VECTOR" BLOB,
+ "NUM_NULLS" BIGINT NOT NULL,
+ "AVG_COL_LEN" DOUBLE,
+ "MAX_COL_LEN" BIGINT,
+ "NUM_TRUES" BIGINT,
+ "NUM_FALSES" BIGINT,
+ "LAST_ANALYZED" BIGINT,
+ "CS_ID" BIGINT NOT NULL,
+ "PART_ID" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."VERSION" ("VER_ID" BIGINT NOT NULL, "SCHEMA_VERSION" VARCHAR(127) NOT NULL, "VERSION_COMMENT" VARCHAR(255));
+
+CREATE TABLE "APP"."FUNCS" ("FUNC_ID" BIGINT NOT NULL, "CLASS_NAME" VARCHAR(4000), "CREATE_TIME" INTEGER NOT NULL, "DB_ID" BIGINT, "FUNC_NAME" VARCHAR(128), "FUNC_TYPE" INTEGER NOT NULL, "OWNER_NAME" VARCHAR(128), "OWNER_TYPE" VARCHAR(10));
+
+CREATE TABLE "APP"."FUNC_RU" ("FUNC_ID" BIGINT NOT NULL, "RESOURCE_TYPE" INTEGER NOT NULL, "RESOURCE_URI" VARCHAR(4000), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."NOTIFICATION_LOG" (
+ "NL_ID" BIGINT NOT NULL,
+ "CAT_NAME" VARCHAR(256),
+ "DB_NAME" VARCHAR(128),
+ "EVENT_ID" BIGINT NOT NULL,
+ "EVENT_TIME" INTEGER NOT NULL,
+ "EVENT_TYPE" VARCHAR(32) NOT NULL,
+ "MESSAGE" CLOB,
+ "TBL_NAME" VARCHAR(256),
+ "MESSAGE_FORMAT" VARCHAR(16)
+);
+
+CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, "CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT , "PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL, "POSITION" BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE" SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT, "ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400));
+
+CREATE TABLE "APP"."METASTORE_DB_PROPERTIES" ("PROPERTY_KEY" VARCHAR(255) NOT NULL, "PROPERTY_VALUE" VARCHAR(1000) NOT NULL, "DESCRIPTION" VARCHAR(1000));
+
+CREATE TABLE "APP"."WM_RESOURCEPLAN" (RP_ID BIGINT NOT NULL, NAME VARCHAR(128) NOT NULL, QUERY_PARALLELISM INTEGER, STATUS VARCHAR(20) NOT NULL, DEFAULT_POOL_ID BIGINT);
+
+CREATE TABLE "APP"."WM_POOL" (POOL_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, PATH VARCHAR(1024) NOT NULL, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER, SCHEDULING_POLICY VARCHAR(1024));
+
+CREATE TABLE "APP"."WM_TRIGGER" (TRIGGER_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, NAME VARCHAR(128) NOT NULL, TRIGGER_EXPRESSION VARCHAR(1024), ACTION_EXPRESSION VARCHAR(1024), IS_IN_UNMANAGED INTEGER NOT NULL DEFAULT 0);
+
+CREATE TABLE "APP"."WM_POOL_TO_TRIGGER" (POOL_ID BIGINT NOT NULL, TRIGGER_ID BIGINT NOT NULL);
+
+CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(128) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
+
+CREATE TABLE "APP"."MV_CREATION_METADATA" (
+ "MV_CREATION_METADATA_ID" BIGINT NOT NULL,
+ "CAT_NAME" VARCHAR(256) NOT NULL,
+ "DB_NAME" VARCHAR(128) NOT NULL,
+ "TBL_NAME" VARCHAR(256) NOT NULL,
+ "TXN_LIST" CLOB,
+ "MATERIALIZATION_TIME" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."MV_TABLES_USED" (
+ "MV_CREATION_METADATA_ID" BIGINT NOT NULL,
+ "TBL_ID" BIGINT NOT NULL
+);
+
+CREATE TABLE "APP"."CTLGS" (
+ "CTLG_ID" BIGINT NOT NULL,
+ "NAME" VARCHAR(256) UNIQUE,
+ "DESC" VARCHAR(4000),
+ "LOCATION_URI" VARCHAR(4000) NOT NULL,
+ "CREATE_TIME" INTEGER);
+
+-- Insert a default value. The location is TBD. Hive will fix this when it starts
+INSERT INTO "APP"."CTLGS" VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
+
+-- ----------------------------------------------
+-- DML Statements
+-- ----------------------------------------------
+
+INSERT INTO "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_EVENT_ID" FROM "APP"."NOTIFICATION_SEQUENCE");
+
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE" WHERE "SEQUENCE_NAME" = 'org.apache.hadoop.hive.metastore.model.MNotificationLog');
+
+-- ----------------------------------------------
+-- DDL Statements for indexes
+-- ----------------------------------------------
+
+CREATE UNIQUE INDEX "APP"."UNIQUEINDEX" ON "APP"."IDXS" ("INDEX_NAME", "ORIG_TBL_ID");
+
+CREATE INDEX "APP"."TABLECOLUMNPRIVILEGEINDEX" ON "APP"."TBL_COL_PRIVS" ("AUTHORIZER", "TBL_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_COL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."DBPRIVILEGEINDEX" ON "APP"."DB_PRIVS" ("AUTHORIZER", "DB_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "DB_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE INDEX "APP"."PCS_STATS_IDX" ON "APP"."PART_COL_STATS" ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME","PARTITION_NAME");
+
+CREATE INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME");
+
+CREATE INDEX "APP"."PARTPRIVILEGEINDEX" ON "APP"."PART_PRIVS" ("AUTHORIZER", "PART_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."ROLEENTITYINDEX" ON "APP"."ROLES" ("ROLE_NAME");
+
+CREATE INDEX "APP"."TABLEPRIVILEGEINDEX" ON "APP"."TBL_PRIVS" ("AUTHORIZER", "TBL_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUETABLE" ON "APP"."TBLS" ("TBL_NAME", "DB_ID");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_DATABASE" ON "APP"."DBS" ("NAME", "CTLG_NAME");
+
+CREATE UNIQUE INDEX "APP"."USERROLEMAPINDEX" ON "APP"."ROLE_MAP" ("PRINCIPAL_NAME", "ROLE_ID", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."GLOBALPRIVILEGEINDEX" ON "APP"."GLOBAL_PRIVS" ("AUTHORIZER", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "USER_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_TYPE" ON "APP"."TYPES" ("TYPE_NAME");
+
+CREATE INDEX "APP"."PARTITIONCOLUMNPRIVILEGEINDEX" ON "APP"."PART_COL_PRIVS" ("AUTHORIZER", "PART_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_COL_PRIV", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUEPARTITION" ON "APP"."PARTITIONS" ("PART_NAME", "TBL_ID");
+
+CREATE UNIQUE INDEX "APP"."UNIQUEFUNCTION" ON "APP"."FUNCS" ("FUNC_NAME", "DB_ID");
+
+CREATE INDEX "APP"."FUNCS_N49" ON "APP"."FUNCS" ("DB_ID");
+
+CREATE INDEX "APP"."FUNC_RU_N49" ON "APP"."FUNC_RU" ("FUNC_ID");
+
+CREATE INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON "APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID");
+
+CREATE INDEX "APP"."CONSTRAINTS_CONSTRAINT_TYPE_INDEX" ON "APP"."KEY_CONSTRAINTS"("CONSTRAINT_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_POOL" ON "APP"."WM_POOL" ("RP_ID", "PATH");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_TRIGGER" ON "APP"."WM_TRIGGER" ("RP_ID", "NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_WM_MAPPING" ON "APP"."WM_MAPPING" ("RP_ID", "ENTITY_TYPE", "ENTITY_NAME");
+
+CREATE UNIQUE INDEX "APP"."MV_UNIQUE_TABLE" ON "APP"."MV_CREATION_METADATA" ("TBL_NAME", "DB_NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_CATALOG" ON "APP"."CTLGS" ("NAME");
+
+
+-- ----------------------------------------------
+-- DDL Statements for keys
+-- ----------------------------------------------
+
+-- primary/unique
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_PK" PRIMARY KEY ("INDEX_ID");
+
+ALTER TABLE "APP"."TBL_COL_PRIVS" ADD CONSTRAINT "TBL_COL_PRIVS_PK" PRIMARY KEY ("TBL_COLUMN_GRANT_ID");
+
+ALTER TABLE "APP"."CDS" ADD CONSTRAINT "SQL110922153006460" PRIMARY KEY ("CD_ID");
+
+ALTER TABLE "APP"."DB_PRIVS" ADD CONSTRAINT "DB_PRIVS_PK" PRIMARY KEY ("DB_GRANT_ID");
+
+ALTER TABLE "APP"."INDEX_PARAMS" ADD CONSTRAINT "INDEX_PARAMS_PK" PRIMARY KEY ("INDEX_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEY_PK" PRIMARY KEY ("TBL_ID", "PKEY_NAME");
+
+ALTER TABLE "APP"."SEQUENCE_TABLE" ADD CONSTRAINT "SEQUENCE_TABLE_PK" PRIMARY KEY ("SEQUENCE_NAME");
+
+ALTER TABLE "APP"."PART_PRIVS" ADD CONSTRAINT "PART_PRIVS_PK" PRIMARY KEY ("PART_GRANT_ID");
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_PK" PRIMARY KEY ("SD_ID");
+
+ALTER TABLE "APP"."SERDES" ADD CONSTRAINT "SERDES_PK" PRIMARY KEY ("SERDE_ID");
+
+ALTER TABLE "APP"."COLUMNS" ADD CONSTRAINT "COLUMNS_PK" PRIMARY KEY ("SD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."PARTITION_EVENTS" ADD CONSTRAINT "PARTITION_EVENTS_PK" PRIMARY KEY ("PART_NAME_ID");
+
+ALTER TABLE "APP"."TYPE_FIELDS" ADD CONSTRAINT "TYPE_FIELDS_PK" PRIMARY KEY ("TYPE_NAME", "FIELD_NAME");
+
+ALTER TABLE "APP"."ROLES" ADD CONSTRAINT "ROLES_PK" PRIMARY KEY ("ROLE_ID");
+
+ALTER TABLE "APP"."TBL_PRIVS" ADD CONSTRAINT "TBL_PRIVS_PK" PRIMARY KEY ("TBL_GRANT_ID");
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_PK" PRIMARY KEY ("SERDE_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."NUCLEUS_TABLES" ADD CONSTRAINT "NUCLEUS_TABLES_PK" PRIMARY KEY ("CLASS_NAME");
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_PK" PRIMARY KEY ("TBL_ID");
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_PK" PRIMARY KEY ("SD_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_PK" PRIMARY KEY ("DB_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_PK" PRIMARY KEY ("DB_ID");
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_PK" PRIMARY KEY ("ROLE_GRANT_ID");
+
+ALTER TABLE "APP"."GLOBAL_PRIVS" ADD CONSTRAINT "GLOBAL_PRIVS_PK" PRIMARY KEY ("USER_GRANT_ID");
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."PARTITION_KEY_VALS" ADD CONSTRAINT "PARTITION_KEY_VALS_PK" PRIMARY KEY ("PART_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."TYPES" ADD CONSTRAINT "TYPES_PK" PRIMARY KEY ("TYPES_ID");
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "SQL110922153006740" PRIMARY KEY ("CD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."PART_COL_PRIVS" ADD CONSTRAINT "PART_COL_PRIVS_PK" PRIMARY KEY ("PART_COLUMN_GRANT_ID");
+
+ALTER TABLE "APP"."PARTITION_PARAMS" ADD CONSTRAINT "PARTITION_PARAMS_PK" PRIMARY KEY ("PART_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_PK" PRIMARY KEY ("PART_ID");
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_PK" PRIMARY KEY ("TBL_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST" ADD CONSTRAINT "SKEWED_STRING_LIST_PK" PRIMARY KEY ("STRING_LIST_ID");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_PK" PRIMARY KEY ("STRING_LIST_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_PK" PRIMARY KEY ("SD_ID", "STRING_LIST_ID_KID");
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_PK" PRIMARY KEY ("SD_ID_OID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."TAB_COL_STATS" ADD CONSTRAINT "TAB_COL_STATS_PK" PRIMARY KEY ("CS_ID");
+
+ALTER TABLE "APP"."PART_COL_STATS" ADD CONSTRAINT "PART_COL_STATS_PK" PRIMARY KEY ("CS_ID");
+
+ALTER TABLE "APP"."FUNCS" ADD CONSTRAINT "FUNCS_PK" PRIMARY KEY ("FUNC_ID");
+
+ALTER TABLE "APP"."FUNC_RU" ADD CONSTRAINT "FUNC_RU_PK" PRIMARY KEY ("FUNC_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."NOTIFICATION_LOG" ADD CONSTRAINT "NOTIFICATION_LOG_PK" PRIMARY KEY ("NL_ID");
+
+ALTER TABLE "APP"."NOTIFICATION_SEQUENCE" ADD CONSTRAINT "NOTIFICATION_SEQUENCE_PK" PRIMARY KEY ("NNI_ID");
+
+ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("CONSTRAINT_NAME", "POSITION");
+
+ALTER TABLE "APP"."METASTORE_DB_PROPERTIES" ADD CONSTRAINT "PROPERTY_KEY_PK" PRIMARY KEY ("PROPERTY_KEY");
+
+ALTER TABLE "APP"."MV_CREATION_METADATA" ADD CONSTRAINT "MV_CREATION_METADATA_PK" PRIMARY KEY ("MV_CREATION_METADATA_ID");
+
+ALTER TABLE "APP"."CTLGS" ADD CONSTRAINT "CTLG_PK" PRIMARY KEY ("CTLG_ID");
+
+
+-- foreign
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK1" FOREIGN KEY ("ORIG_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "IDXS_FK3" FOREIGN KEY ("INDEX_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBL_COL_PRIVS" ADD CONSTRAINT "TBL_COL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DB_PRIVS" ADD CONSTRAINT "DB_PRIVS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."INDEX_PARAMS" ADD CONSTRAINT "INDEX_PARAMS_FK1" FOREIGN KEY ("INDEX_ID") REFERENCES "APP"."IDXS" ("INDEX_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEYS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_PRIVS" ADD CONSTRAINT "PART_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK2" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."COLUMNS" ADD CONSTRAINT "COLUMNS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TYPE_FIELDS" ADD CONSTRAINT "TYPE_FIELDS_FK1" FOREIGN KEY ("TYPE_NAME") REFERENCES "APP"."TYPES" ("TYPES_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBL_PRIVS" ADD CONSTRAINT "TBL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_FK1" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_FK1" FOREIGN KEY ("ROLE_ID") REFERENCES "APP"."ROLES" ("ROLE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_KEY_VALS" ADD CONSTRAINT "PARTITION_KEY_VALS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "COLUMNS_V2_FK1" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_COL_PRIVS" ADD CONSTRAINT "PART_COL_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITION_PARAMS" ADD CONSTRAINT "PARTITION_PARAMS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PARTITIONS" ADD CONSTRAINT "PARTITIONS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_FK1" FOREIGN KEY ("STRING_LIST_ID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK2" FOREIGN KEY ("STRING_LIST_ID_KID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK1" FOREIGN KEY ("SD_ID_OID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK2" FOREIGN KEY ("STRING_LIST_ID_EID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TAB_COL_STATS" ADD CONSTRAINT "TAB_COL_STATS_FK" FOREIGN KEY ("TBL_ID") REFERENCES TBLS("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."PART_COL_STATS" ADD CONSTRAINT "PART_COL_STATS_FK" FOREIGN KEY ("PART_ID") REFERENCES PARTITIONS("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."VERSION" ADD CONSTRAINT "VERSION_PK" PRIMARY KEY ("VER_ID");
+
+ALTER TABLE "APP"."FUNCS" ADD CONSTRAINT "FUNCS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."FUNC_RU" ADD CONSTRAINT "FUNC_RU_FK1" FOREIGN KEY ("FUNC_ID") REFERENCES "APP"."FUNCS" ("FUNC_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_RESOURCEPLAN" ADD CONSTRAINT "WM_RESOURCEPLAN_PK" PRIMARY KEY ("RP_ID");
+
+ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_PK" PRIMARY KEY ("POOL_ID");
+
+ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_RESOURCEPLAN" ADD CONSTRAINT "WM_RESOURCEPLAN_FK1" FOREIGN KEY ("DEFAULT_POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_TRIGGER" ADD CONSTRAINT "WM_TRIGGER_PK" PRIMARY KEY ("TRIGGER_ID");
+
+ALTER TABLE "APP"."WM_TRIGGER" ADD CONSTRAINT "WM_TRIGGER_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK1" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK2" FOREIGN KEY ("TRIGGER_ID") REFERENCES "APP"."WM_TRIGGER" ("TRIGGER_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_PK" PRIMARY KEY ("MAPPING_ID");
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_FK2" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_FK1" FOREIGN KEY ("MV_CREATION_METADATA_ID") REFERENCES "APP"."MV_CREATION_METADATA" ("MV_CREATION_METADATA_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_FK2" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_CTLG_FK" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+-- ----------------------------------------------
+-- DDL Statements for checks
+-- ----------------------------------------------
+
+ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "SQL110318025504980" CHECK (DEFERRED_REBUILD IN ('Y','N'));
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED IN ('Y','N'));
+
+-- ----------------------------
+-- Transaction and Lock Tables
+-- ----------------------------
+CREATE TABLE TXNS (
+ TXN_ID bigint PRIMARY KEY,
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER varchar(128) NOT NULL,
+ TXN_HOST varchar(128) NOT NULL,
+ TXN_AGENT_INFO varchar(128),
+ TXN_META_INFO varchar(128),
+ TXN_HEARTBEAT_COUNT integer,
+ TXN_TYPE integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+ TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID),
+ TC_DATABASE varchar(128) NOT NULL,
+ TC_TABLE varchar(128),
+ TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint
+);
+
+CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+ CTC_TXNID bigint NOT NULL,
+ CTC_DATABASE varchar(128) NOT NULL,
+ CTC_TABLE varchar(256),
+ CTC_PARTITION varchar(767),
+ CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID bigint,
+ CTC_UPDATE_DELETE char(1) NOT NULL
+);
+
+CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
+
+CREATE TABLE NEXT_TXN_ID (
+ NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID bigint NOT NULL,
+ HL_LOCK_INT_ID bigint NOT NULL,
+ HL_TXNID bigint NOT NULL,
+ HL_DB varchar(128) NOT NULL,
+ HL_TABLE varchar(128),
+ HL_PARTITION varchar(767),
+ HL_LOCK_STATE char(1) NOT NULL,
+ HL_LOCK_TYPE char(1) NOT NULL,
+ HL_LAST_HEARTBEAT bigint NOT NULL,
+ HL_ACQUIRED_AT bigint,
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT integer,
+ HL_AGENT_INFO varchar(128),
+ HL_BLOCKEDBY_EXT_ID bigint,
+ HL_BLOCKEDBY_INT_ID bigint,
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+ CQ_ID bigint PRIMARY KEY,
+ CQ_DATABASE varchar(128) NOT NULL,
+ CQ_TABLE varchar(128) NOT NULL,
+ CQ_PARTITION varchar(767),
+ CQ_STATE char(1) NOT NULL,
+ CQ_TYPE char(1) NOT NULL,
+ CQ_TBLPROPERTIES varchar(2048),
+ CQ_WORKER_ID varchar(128),
+ CQ_START bigint,
+ CQ_RUN_AS varchar(128),
+ CQ_HIGHEST_WRITE_ID bigint,
+ CQ_META_INFO varchar(2048) for bit data,
+ CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+ NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+ CC_ID bigint PRIMARY KEY,
+ CC_DATABASE varchar(128) NOT NULL,
+ CC_TABLE varchar(128) NOT NULL,
+ CC_PARTITION varchar(767),
+ CC_STATE char(1) NOT NULL,
+ CC_TYPE char(1) NOT NULL,
+ CC_TBLPROPERTIES varchar(2048),
+ CC_WORKER_ID varchar(128),
+ CC_START bigint,
+ CC_END bigint,
+ CC_RUN_AS varchar(128),
+ CC_HIGHEST_WRITE_ID bigint,
+ CC_META_INFO varchar(2048) for bit data,
+ CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar(128) NOT NULL,
+ MT_KEY2 bigint NOT NULL,
+ MT_COMMENT varchar(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK
+--This is a good candidate for Index orgainzed table
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TBL_TO_TXN_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+CREATE UNIQUE INDEX TBL_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_WRITEID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+CREATE TABLE MIN_HISTORY_LEVEL (
+ MHL_TXNID bigint NOT NULL,
+ MHL_MIN_OPEN_TXNID bigint NOT NULL,
+ PRIMARY KEY(MHL_TXNID)
+);
+
+CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
+
+CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
+ MRL_TXN_ID BIGINT NOT NULL,
+ MRL_DB_NAME VARCHAR(128) NOT NULL,
+ MRL_TBL_NAME VARCHAR(256) NOT NULL,
+ MRL_LAST_HEARTBEAT BIGINT NOT NULL,
+ PRIMARY KEY(MRL_TXN_ID)
+);
+
+CREATE TABLE "APP"."I_SCHEMA" (
+ "SCHEMA_ID" bigint primary key,
+ "SCHEMA_TYPE" integer not null,
+ "NAME" varchar(256) unique,
+ "DB_ID" bigint references "APP"."DBS" ("DB_ID"),
+ "COMPATIBILITY" integer not null,
+ "VALIDATION_LEVEL" integer not null,
+ "CAN_EVOLVE" char(1) not null,
+ "SCHEMA_GROUP" varchar(256),
+ "DESCRIPTION" varchar(4000)
+);
+
+CREATE TABLE "APP"."SCHEMA_VERSION" (
+ "SCHEMA_VERSION_ID" bigint primary key,
+ "SCHEMA_ID" bigint references "APP"."I_SCHEMA" ("SCHEMA_ID"),
+ "VERSION" integer not null,
+ "CREATED_AT" bigint not null,
+ "CD_ID" bigint references "APP"."CDS" ("CD_ID"),
+ "STATE" integer not null,
+ "DESCRIPTION" varchar(4000),
+ "SCHEMA_TEXT" clob,
+ "FINGERPRINT" varchar(256),
+ "SCHEMA_VERSION_NAME" varchar(256),
+ "SERDE_ID" bigint references "APP"."SERDES" ("SERDE_ID")
+);
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_SCHEMA_VERSION" ON "APP"."SCHEMA_VERSION" ("SCHEMA_ID", "VERSION");
+
+CREATE TABLE REPL_TXN_MAP (
+ RTM_REPL_POLICY varchar(256) NOT NULL,
+ RTM_SRC_TXN_ID bigint NOT NULL,
+ RTM_TARGET_TXN_ID bigint NOT NULL,
+ PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
+);
+
+CREATE TABLE "APP"."RUNTIME_STATS" (
+ "RS_ID" bigint primary key,
+ "CREATE_TIME" integer not null,
+ "WEIGHT" integer not null,
+ "PAYLOAD" BLOB
+);
+
+CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(767) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME integer NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
+-- -----------------------------------------------------------------
+-- Record schema version. Should be the last step in the init script
+-- -----------------------------------------------------------------
+INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '3.2.0', 'Hive release version 3.2.0');
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
new file mode 100644
index 0000000000..5cdac5fe8c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "nifi",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": ["int", "null"]},
+ {"name": "name", "type": ["string", "null"]},
+ {"name": "department", "type": ["string", "null"]}
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
new file mode 100644
index 0000000000..1c0fada816
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-services-api-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..6effaa89d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..3e58f19e5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,501 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+ (ASLv2) Apache Iceberg
+ The following NOTICE information applies:
+ Apache Iceberg
+ Copyright 2017-2022 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Ant
+ The following NOTICE information applies:
+ Apache Ant
+ Copyright 1999-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons DBCP
+ The following NOTICE information applies:
+ Apache Commons DBCP
+ Copyright 2001-2015 The Apache Software Foundation.
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpComponents Client
+ Copyright 1999-2016 The Apache Software Foundation
+ Apache HttpComponents Core - HttpCore
+ Copyright 2006-2009 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Pool
+ The following NOTICE information applies:
+ Apache Commons Pool
+ Copyright 1999-2009 The Apache Software Foundation.
+
+ (ASLv2) Apache Commons BeanUtils
+ The following NOTICE information applies:
+ Apache Commons BeanUtils
+ Copyright 2000-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons IO
+ The following NOTICE information applies:
+ Apache Commons IO
+ Copyright 2002-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Net
+ The following NOTICE information applies:
+ Apache Commons Net
+ Copyright 2001-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2017 The Apache Software Foundation
+
+ (ASLv2) Apache Parquet
+ The following NOTICE information applies:
+ Apache Parquet MR (Incubating)
+ Copyright 2014 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Audience Annotations
+ The following NOTICE information applies:
+ Apache Yetus
+ Copyright 2008-2018 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Compress
+ The following NOTICE information applies:
+ Apache Commons Compress
+ Copyright 2002-2017 The Apache Software Foundation
+
+ The files in the package org.apache.commons.compress.archivers.sevenz
+ were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+ which has been placed in the public domain:
+
+ "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+ (ASLv2) Apache Commons Configuration
+ The following NOTICE information applies:
+ Apache Commons Configuration
+ Copyright 2001-2017 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Text
+ The following NOTICE information applies:
+ Apache Commons Text
+ Copyright 2001-2018 The Apache Software Foundation
+
+ (ASLv2) Apache Commons CLI
+ The following NOTICE information applies:
+ Apache Commons CLI
+ Copyright 2001-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Commons Collections
+ The following NOTICE information applies:
+ Apache Commons Collections
+ Copyright 2001-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Daemon
+ The following NOTICE information applies:
+ Apache Commons Daemon
+ Copyright 2002-2013 The Apache Software Foundation
+
+ (ASLv2) Apache Ivy
+ The following NOTICE information applies:
+ Copyright 2007-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Portions of Ivy were originally developed at
+ Jayasoft SARL (http://www.jayasoft.fr/)
+ and are licensed to the Apache Software Foundation under the
+ "Software Grant License Agreement"
+
+ SSH and SFTP support is provided by the JCraft JSch package,
+ which is open source software, available under
+ the terms of a BSD style license.
+ The original software and related information is available
+ at http://www.jcraft.com/jsch/.
+
+ (ASLv2) Apache Commons Math
+ The following NOTICE information applies:
+ Apache Commons Math
+ Copyright 2001-2012 The Apache Software Foundation
+
+ (ASLv2) Apache Hive
+ The following NOTICE information applies:
+ Apache Hive
+ Copyright 2008-2015 The Apache Software Foundation
+
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+
+ This product includes Jersey (https://jersey.java.net/)
+ Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+ This project includes software copyrighted by Microsoft Corporation and
+ licensed under the Apache License, Version 2.0.
+
+ This project includes software copyrighted by Dell SecureWorks and
+ licensed under the Apache License, Version 2.0.
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) BoneCP
+ The following NOTICE information applies:
+ BoneCP
+ Copyright 2010 Wallace Wadge
+
+ (ASLv2) Apache Hadoop
+ The following NOTICE information applies:
+ The binary distribution of this product bundles binaries of
+ org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the
+ following notices:
+ * Copyright 2011 Dain Sundstrom <da...@iq80.com>
+ * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+ The binary distribution of this product bundles binaries of
+ org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni),
+ which has the following notices:
+ * This product includes software developed by FuseSource Corp.
+ http://fusesource.com
+ * This product includes software developed at
+ Progress Software Corporation and/or its subsidiaries or affiliates.
+ * This product includes software developed by IBM Corporation and others.
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ (ASLv2) Apache Curator
+ The following NOTICE information applies:
+ Curator Framework
+ Copyright 2011-2014 The Apache Software Foundation
+
+ Curator Client
+ Copyright 2011-2014 The Apache Software Foundation
+
+ Curator Recipes
+ Copyright 2011-2014 The Apache Software Foundation
+
+ (ASLv2) Apache Derby
+ The following NOTICE information applies:
+ Apache Derby
+ Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
+ the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
+
+ (ASLv2) Apache Geronimo
+ The following NOTICE information applies:
+ Apache Geronimo
+ Copyright 2003-2008 The Apache Software Foundation
+
+ (ASLv2) Jettison
+ The following NOTICE information applies:
+ Copyright 2006 Envoi Solutions LLC
+
+ (ASLv2) Jetty
+ The following NOTICE information applies:
+ Jetty Web Container
+ Copyright 1995-2019 Mort Bay Consulting Pty Ltd.
+
+ (ASLv2) Apache log4j
+ The following NOTICE information applies:
+ Apache log4j
+ Copyright 2007 The Apache Software Foundation
+
+ (ASLv2) Apache Thrift
+ The following NOTICE information applies:
+ Apache Thrift
+ Copyright 2006-2010 The Apache Software Foundation.
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+ LongAdder), which was released with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ The Netty Project
+ Copyright 2011 The Netty Project
+
+ (ASLv2) Apache ZooKeeper
+ The following NOTICE information applies:
+ Apache ZooKeeper
+ Copyright 2009-2012 The Apache Software Foundation
+
+ (ASLv2) JPam
+ The following NOTICE information applies:
+ Copyright 2003-2006 Greg Luck
+
+ (ASLv2) Groovy 2.4.16 (http://www.groovy-lang.org)
+ groovy-2.4.16-indy
+ groovy-json-2.4.16-indy
+ groovy-sql-2.4.16-indy
+ The following NOTICE information applies:
+ Apache Groovy
+ Copyright 2003-2018 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ This product includes/uses ANTLR (http://www.antlr2.org/)
+ developed by Terence Parr 1989-2006
+
+ (ASLv2) ASM Based Accessors Helper Used By JSON Smart (net.minidev:accessors-smart:jar:1.2 - http://www.minidev.net/)
+ The following NOTICE information applies:
+ ASM Based Accessors Helper Used By JSON Smart 1.2
+ Copyright 2017, Uriel Chemouni
+
+ (ASLv2) JSON Smart (net.minidev:json-smart:jar:2.3 - http://www.minidev.net/)
+ The following NOTICE information applies:
+ JSON Smart 2.3
+ Copyright 2017, Uriel Chemouni, Eitan Raviv
+
+ (ASLv2) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt - https://connect2id.com/products/nimbus-jose-jwt)
+ The following NOTICE information applies:
+ Nimbus JOSE+JWT
+ Copyright 2021, Connect2id Ltd.
+
+ (ASLv2) Woodstox (com.fasterxml.woodstox:woodstox-core:bundle:5.3.0 - https://github.com/FasterXML/woodstox)
+ The following NOTICE information applies:
+ Woodstox Core 5.3.0
+ Copyright 2015, FasterXML, LLC
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) java-util
+ The following NOTICE information applies:
+ java-util
+ Copyright 2011-2017 Metamarkets Group Inc.
+
+ (ASLv2) JCIP Annotations Under Apache License
+ The following NOTICE information applies:
+ JCIP Annotations Under Apache License
+ Copyright 2013 Stephen Connolly.
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
+
+ (ASLv2) Guava
+ The following NOTICE information applies:
+ Guava
+ Copyright 2015 The Guava Authors
+
+ (ASLv2) OkHttp
+ The following NOTICE information applies:
+ OkHttp
+ Copyright (C) 2014 Square, Inc.
+
+ (ASLv2) Okio
+ The following NOTICE information applies:
+ Okio
+ Copyright (C) 2014 Square, Inc.
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Dropwizard Metrics
+ Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
+
+ (ASLv2) atinject (javax.inject:javax.inject)
+ The following NOTICE information applies:
+ atinject
+ Copyright
+
+ (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+ (ASLv2) JetBrains/java-annotations
+ The following NOTICE information applies:
+ JetBrains/java-annotations
+ Copyright 2000-2016 JetBrains s.r.o.
+
+ (ASLv2) Apache Kerby
+ The following NOTICE information applies:
+ Apache Kerby
+ Copyright 2003-2018 The Apache Software Foundation
+
+ (ASLv2) Carrotsearch HPPC
+ The following NOTICE information applies:
+ HPPC borrowed code, ideas or both from:
+
+ * Apache Lucene, http://lucene.apache.org/
+ (Apache license)
+ * Fastutil, http://fastutil.di.unimi.it/
+ (Apache license)
+ * Koloboke, https://github.com/OpenHFT/Koloboke
+ (Apache license)
+
+ (ASLv2) Ehcache 2.x
+ The following NOTICE information applies:
+ Copyright 2003-2010 Terracotta, Inc.
+
+ (ASLv2) Google Guice
+ The following NOTICE information applies:
+ Google Guice - Core Library
+ Copyright 2006-2011 Google, Inc.
+
+ Google Guice - Extensions - Servlet
+ Copyright 2006-2011 Google, Inc.
+
+ (ASLv2) Apache Arrow
+ The following NOTICE information applies:
+ Copyright 2016-2019 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+
+ (ASLv2) Apache ORC
+ The following NOTICE information applies:
+ Copyright 2013-2019 The Apache Software Foundation
+
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+
+ This product includes software developed by Hewlett-Packard:
+ (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.19 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.11 - https://jaxb.dev.java.net/)
+
+************************
+Common Development and Distribution License 1.0
+************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
+
+ (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:3.1.0 - no url available)
+ (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
+ (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
+
+*****************
+Public Domain
+*****************
+
+ The following binary components are provided to the 'Public Domain'. See project link for details.
+
+ (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
+
+
+************************
+Eclipse Distribution License 1.0
+************************
+
+ The following binary components are provided under the Eclipse Distribution License 1.0.
+
+ (EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.1)
+ (EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
+
+************************
+Eclipse Public License 2.0
+************************
+
+ The following binary components are provided under the Eclipse Public License 2.0.
+
+ (EPL 2.0) javax.ws.rs-api (https://github.com/eclipse-ee4j/jaxrs-api) javax.ws.rs:javax.ws.rs-api:bundle:2.1.1
+
+************************
+BSD License
+************************
+
+ (BSD) JSch
+ The following NOTICE information applies:
+ Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc.
+ All rights reserved.
+ https://www.jcraft.com/jsch/
+
+ (BSD 3-Clause) JLine Bundle
+ The following NOTICE information applies:
+ Copyright (c) 2002-2007, Marc Prud'hommeaux. All rights reserved.
+ https://github.com/jline/jline1
+
+ (BSD 3-Clause) ThreeTen-Extra
+ The following NOTICE information applies:
+ Copyright (c) 2007-2022, Stephen Colebourne & Michael Nascimento Santos.
+ https://github.com/ThreeTen/threeten-extra/
+
+************************
+Go License
+************************
+
+The following binary components are provided under the Go License. See project link for details.
+
+ (Go) RE2/J
+ The following NOTICE information applies:
+ Copyright (c) 2009 The Go Authors. All rights reserved.
+ https://github.com/google/re2j
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
new file mode 100644
index 0000000000..0d96a0b180
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-services-api</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- Internal dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-metastore</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-orc</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>tephra-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>tephra-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>tephra-hbase-compat-1.0</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.tdunning</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.transaction</groupId>
+ <artifactId>transaction-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
new file mode 100644
index 0000000000..991ac625dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * Provides a basic connector to Iceberg catalog services.
+ */
+public interface IcebergCatalogService extends ControllerService {
+
+ Catalog getCatalog();
+
+ Configuration getConfiguration();
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml
new file mode 100644
index 0000000000..c376976906
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-services-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api-nar</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..6effaa89d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..ee13e4dc2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,17 @@
+nifi-iceberg-services-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+ (ASLv2) Apache Iceberg
+ The following NOTICE information applies:
+ Apache Iceberg
+ Copyright 2017-2022 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
new file mode 100644
index 0000000000..37c4e8150f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-services</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-services-api</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
new file mode 100644
index 0000000000..38f156c68d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+
+/**
+ * Abstract class holding common properties and methods for Catalog Service implementations.
+ */
+public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+ protected Configuration configuration = new Configuration();
+
+ static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+ .name("hadoop-config-resources")
+ .displayName("Hadoop Configuration Resources")
+ .description("A file, or comma separated list of files, which contain the Hadoop configuration (core-site.xml, etc.). Without this, default configuration will be used.")
+ .required(false)
+ .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamicallyModifiesClasspath(true)
+ .build();
+
+ /**
+ * Loads configuration files from the provided paths.
+ *
+ * @param configFiles list of config file paths separated with comma
+ * @return merged configuration
+ */
+ protected Configuration getConfigurationFromFiles(String configFiles) {
+ final Configuration conf = new Configuration();
+ if (StringUtils.isNotBlank(configFiles)) {
+ for (final String configFile : configFiles.split(",")) {
+ conf.addResource(new Path(configFile.trim()));
+ }
+ }
+ return conf;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
new file mode 100644
index 0000000000..dcf2dd395f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"})
+@CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.")
+public class HadoopCatalogService extends AbstractCatalogService {
+
+ static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder()
+ .name("warehouse-path")
+ .displayName("Warehouse Path")
+ .description("Path to the location of the warehouse.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ WAREHOUSE_PATH,
+ HADOOP_CONFIGURATION_RESOURCES
+ ));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ private HadoopCatalog catalog;
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final String warehousePath = context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue();
+
+ if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+ final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+ configuration = getConfigurationFromFiles(configFiles);
+ catalog = new HadoopCatalog(configuration, warehousePath);
+ } else {
+ catalog = new HadoopCatalog(new Configuration(), warehousePath);
+ }
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
new file mode 100644
index 0000000000..9dc648c3e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"iceberg", "catalog", "service", "metastore", "hive"})
+@CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.")
+public class HiveCatalogService extends AbstractCatalogService {
+
+ static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
+ .name("hive-metastore-uri")
+ .displayName("Hive Metastore URI")
+ .description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.URI_LIST_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor WAREHOUSE_LOCATION = new PropertyDescriptor.Builder()
+ .name("warehouse-location")
+ .displayName("Default Warehouse Location")
+ .description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ METASTORE_URI,
+ WAREHOUSE_LOCATION,
+ HADOOP_CONFIGURATION_RESOURCES
+ ));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ private HiveCatalog catalog;
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+
+ final List<ValidationResult> problems = new ArrayList<>();
+ String configMetastoreUri = null;
+ String configWarehouseLocation = null;
+
+ if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+ final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+ Configuration configuration = getConfigurationFromFiles(configFiles);
+ configMetastoreUri = configuration.get("hive.metastore.uris");
+ configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir");
+ }
+
+ final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
+ final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
+
+ if (configMetastoreUri == null && propertyMetastoreUri == null) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Hive Metastore URI")
+ .valid(false)
+ .explanation("cannot find hive metastore uri, please provide it in the 'Hive Metastore URI' property" +
+ " or provide a configuration file which contains 'hive.metastore.uris' value.")
+ .build());
+ }
+
+ if (configWarehouseLocation == null && propertyWarehouseLocation == null) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Default Warehouse Location")
+ .valid(false)
+ .explanation("cannot find default warehouse location, please provide it in the 'Default Warehouse Location' property" +
+ " or provide a configuration file which contains 'hive.metastore.warehouse.dir' value.")
+ .build());
+ }
+
+ return problems;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ catalog = new HiveCatalog();
+ Map<String, String> properties = new HashMap<>();
+
+ if (context.getProperty(METASTORE_URI).isSet()) {
+ properties.put(CatalogProperties.URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
+ }
+
+ if (context.getProperty(WAREHOUSE_LOCATION).isSet()) {
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
+ }
+
+ if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
+ final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+
+ configuration = getConfigurationFromFiles(configFiles);
+ catalog.setConf(configuration);
+ }
+
+ catalog.initialize("hive-catalog", properties);
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ return catalog;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100755
index 0000000000..4c042964b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.services.iceberg.HiveCatalogService
+org.apache.nifi.services.iceberg.HadoopCatalogService
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml
new file mode 100644
index 0000000000..6eb5af7028
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <packaging>pom</packaging>
+
+ <properties>
+ <iceberg.version>0.14.0</iceberg.version>
+ <hive.version>3.1.3</hive.version>
+ <hadoop.version>3.3.3</hadoop.version>
+ </properties>
+
+ <modules>
+ <module>nifi-iceberg-services-api</module>
+ <module>nifi-iceberg-services-api-nar</module>
+ <module>nifi-iceberg-services</module>
+ <module>nifi-iceberg-services-nar</module>
+ <module>nifi-iceberg-processors</module>
+ <module>nifi-iceberg-processors-nar</module>
+ </modules>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 9856777d0d..d9521d3396 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -119,6 +119,7 @@
<module>nifi-box-bundle</module>
<module>nifi-flow-registry-client-bundle</module>
<module>nifi-shopify-bundle</module>
+ <module>nifi-iceberg-bundle</module>
</modules>
<build>