You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/06 08:10:42 UTC

[GitHub] [nifi] mark-bathori opened a new pull request, #6368: NIFI-10442: Create PutIceberg processor

mark-bathori opened a new pull request, #6368:
URL: https://github.com/apache/nifi/pull/6368

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-00000](https://issues.apache.org/jira/browse/NIFI-00000)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982188863


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    private Configuration configuration;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+
+        if (catalogService != null) {
+            this.configuration = catalogService.getConfiguration();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;

Review Comment:
   I am not sure, but I think we should assign null to the kerberosUser in a finally block or we should rethrow the KerberosLoginException.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = (Types.TimestampType) type;

Review Comment:
   Missing finals.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {

Review Comment:
   Probably I'm blind, but I don't see where this logic is moved. Don't we need this validation anymore?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    public static Namespace namespace = Namespace.of("test_metastore");
+
+    public static TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "date");
+
+    public static org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();

Review Comment:
   Question still stands, not mandatory, just write a -1 if you don't agree.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {

Review Comment:
   thx



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws IOException {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   I think this problem still stands it is just moved to AbstractIceberProcessor. What do you think?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    private Configuration configuration;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+
+        if (catalogService != null) {
+            this.configuration = catalogService.getConfiguration();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final KerberosUser kerberosUser = getKerberosUser();
+        if (kerberosUser == null) {
+            doOnTrigger(context, session);
+        } else {
+            try {
+                final UserGroupInformation ugi = getUgiForKerberosUser(configuration, kerberosUser);
+
+                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                    doOnTrigger(context, session);
+                    return null;
+                });
+
+            } catch (Exception e) {
+                throw new ProcessException(e);

Review Comment:
   In general exceptions should include a basic message instead of just wrapping the cause.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = (Types.TimestampType) type;
+                        if (timestampType.shouldAdjustToUTC()) {
+                            return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.TimestampConverter.INSTANCE;
+                    case UUID:
+                        UUIDDataType uuidType = (UUIDDataType) dataType;
+                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                            return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case FIXED:
+                        Types.FixedType fixedType = (Types.FixedType) type;
+                        return new GenericDataConverters.FixedConverter(fixedType.length());
+                    case BINARY:
+                        return GenericDataConverters.BinaryConverter.INSTANCE;
+                    case DECIMAL:
+                        Types.DecimalType decimalType = (Types.DecimalType) type;
+                        return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
+                    default:
+                        throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
+                }
+            }
+            return null;

Review Comment:
   Is it a valid scenario to return null here? If we cannot support an unknown type, how can we support a null type?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();

Review Comment:
   thx



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }
+            } catch (IOException ex) {
+                throw new ProcessException("Failed to abort uncommitted data files.", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+
+    private Table initializeTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
+
+        return catalog.loadTable(tableIdentifier);

Review Comment:
   I think this question still stands but even if you agree I don't think we should to that in this pr.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/appender/avro/AvroWithNifiSchemaVisitor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.appender.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.avro.AvroWithPartnerByStructureVisitor;
+import org.apache.iceberg.util.Pair;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+/**
+ * This class contains Avro specific visitor methods to traverse schema and build value writer list for data types.
+ */
+public class AvroWithNifiSchemaVisitor<T> extends AvroWithPartnerByStructureVisitor<DataType, T> {
+
+    @Override
+    protected boolean isStringType(DataType dataType) {
+        return dataType.getFieldType().equals(RecordFieldType.STRING);
+    }
+
+    @Override
+    protected boolean isMapType(DataType dataType) {
+        return dataType instanceof MapDataType;
+    }
+
+    @Override
+    protected DataType arrayElementType(DataType arrayType) {
+        Preconditions.checkArgument(arrayType instanceof ArrayDataType, "Invalid array: %s is not an array", arrayType);

Review Comment:
   thx for excluding it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r967896686


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {

Review Comment:
   I'm wondering, are we sure that we want to transfer the FlowFile only on checked exceptions? For example, commit doesn't declare a checked exception, but I can imagine that it can easily throw a runtime exception. If we don't yield the processor on IOException, we probably don't want to do it either in that case.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {

Review Comment:
   @turcsanyip already mentioned that this part would fit in the verify method. What I would add that now the validation message contains an exception when the table is not an iceberg table:
   ```
   Component is invalid: 'Component' is invalid because Failed to perform validation due to org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: hive-catalog.nifitest.icebergtable (type=null)
   ``` 
   I'm not sure. Is it ok to leak exceptions during validation? Or should we use predefined error messages? Usually, it is better, but this would also mean that a generic message should be used as a fallback which wouldn't be helpful at all.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/appender/avro/AvroWithNifiSchemaVisitor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.appender.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.avro.AvroWithPartnerByStructureVisitor;
+import org.apache.iceberg.util.Pair;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+/**
+ * This class contains Avro specific visitor methods to traverse schema and build value writer list for data types.
+ */
+public class AvroWithNifiSchemaVisitor<T> extends AvroWithPartnerByStructureVisitor<DataType, T> {
+
+    @Override
+    protected boolean isStringType(DataType dataType) {
+        return dataType.getFieldType().equals(RecordFieldType.STRING);
+    }
+
+    @Override
+    protected boolean isMapType(DataType dataType) {
+        return dataType instanceof MapDataType;
+    }
+
+    @Override
+    protected DataType arrayElementType(DataType arrayType) {
+        Preconditions.checkArgument(arrayType instanceof ArrayDataType, "Invalid array: %s is not an array", arrayType);

Review Comment:
   Afaik in NiFi, we aim to avoid Guava. I just checked; we either exclude or use the provided version in most dependencies. What makes things more complicated here is that it seems like the hive-exec-3.1.3 library shades it. I think we can't get rid of it, but at least I think we shouldn't use it in plain NiFi code.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    public static Namespace namespace = Namespace.of("test_metastore");
+
+    public static TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "date");
+
+    public static org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();

Review Comment:
   Afaik final is not mandatory in tests, but I see that sometimes you use it, sometimes not. Could you declare it consistently?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }
+            } catch (IOException ex) {
+                throw new ProcessException("Failed to abort uncommitted data files.", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+
+    private Table initializeTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
+
+        return catalog.loadTable(tableIdentifier);

Review Comment:
   If I understand correctly on every trigger there are 2 network interactions with the db. If I wanted to send all data to the same table, it would make sense to cache the returned `Table`. Is it possible? Does it make sense?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws IOException {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   I'm not a Kerberos expert but what I see from Kerberos code across NiFi is that when logout fails, we either rethrow the exception as a ProcessException or at least clear the reference to the kerberosUser in a finally block. Probably I prefer the latter one.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();

Review Comment:
   This can be final.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/NifiRecordWrapper.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.StructLike;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+/**
+ * Class to wrap and adapt {@link Record} to Iceberg {@link StructLike} for partition handling usage like {@link
+ * org.apache.iceberg.PartitionKey#partition(StructLike)}
+ */
+public class NifiRecordWrapper implements StructLike {
+
+    private final DataType[] types;
+
+    private final BiFunction<Record, Integer, ?>[] getters;
+
+    private Record record = null;
+
+    @SuppressWarnings("unchecked")
+    public NifiRecordWrapper(RecordSchema schema) {
+        this.types = schema.getDataTypes().toArray(new DataType[0]);
+        this.getters = Stream.of(types).map(NifiRecordWrapper::getter).toArray(BiFunction[]::new);
+    }
+
+    public NifiRecordWrapper wrap(Record record) {
+        this.record = record;
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return types.length;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+        if (record.getSchema().getField(pos) == null) {
+            return null;
+        } else if (getters[pos] != null) {
+            return javaClass.cast(getters[pos].apply(record, pos));
+        }
+
+        return javaClass.cast(record.getValue(record.getSchema().getField(pos)));
+    }
+
+    @Override
+    public <T> void set(int i, T t) {
+        throw new UnsupportedOperationException("Record wrapper update is unsupported.");
+    }
+
+    private static BiFunction<Record, Integer, ?> getter(DataType type) {
+        if (type.equals(RecordFieldType.TIMESTAMP.getDataType())) {
+            return (row, pos) -> {
+                final RecordField field = row.getSchema().getField(pos);
+                final Object value = row.getValue(field);
+
+                Timestamp timestamp = DataTypeUtils.toTimestamp(value, () -> DataTypeUtils.getDateFormat(type.getFormat()), field.getFieldName());

Review Comment:
   Missing a few finals in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] AbdelrahimKA commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "AbdelrahimKA (via GitHub)" <gi...@apache.org>.
AbdelrahimKA commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1501281026

   Hi @pvillard31, Thanks for your help!.
   I did this and all is good but when I configure the process I get this error:
   
   **PutIceberg[id=018710cc-a9b6-1854-2954-e2178e328ca7] Failed to load table from catalog: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
   - Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found**
   
   version:1.20
   s3 file system: Minio.
   Hive Cataloge service configured and enabled.
   Core-site.xml:
   ```
   <configuration>
   <property>
   	<name>hive.metastore.warehouse.dir</name>
   	<value>s3a://icebergdb/</value>
   </property>    
   <property>
   	<name>fs.s3a.connection.ssl.enabled</name>
   	<value>false</value>
   	<description>Enables or disables SSL connections to S3.</description>
   </property>
   
   <property>
   	<name>fs.s3a.aws.credentials.provider</name>
   	<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
   </property>
   
   <property>
   	<name>fs.s3a.endpoint</name>
   	<value>http://x.x.x.x:9900</value>
   </property>
   
   <property>
   	<name>fs.s3a.access.key</name>
   	<value>xxxxxxxxxxxxxxxx</value>
   </property>
   
   <property>
   	<name>fs.s3a.secret.key</name>
   	<value>xxxxxxxxxxxxxxxxxxxx</value>
   </property>
   
   <property>
   	<name>fs.s3a.path.style.access</name>
   	<value>true</value>
   </property>
   
   </configuration>
   ```
   
   Do I need to add additional NAR file to work with s3 files system?
   
   Thanks and Best regards
   Abdelrahim Ahmad


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982305303


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {

Review Comment:
   The V2 table format restriction was removed because data insertion doesn't require any additional change for the different table format versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] abdelrahim-ahmad commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "abdelrahim-ahmad (via GitHub)" <gi...@apache.org>.
abdelrahim-ahmad commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1480050425

   Hi Guys, thanks for this great feature. I've installed nifi 1.20.0 but not able to find this new process PutIceberg ?
   why is that? is there any thing I should consider!?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1245152134

   Thanks @turcsanyip and @nandorsoma for the review comments. I tried to fix all of the mentioned issues in my current commit. I’ve also added a couple of new elements to my PR:
   
   - added file format and target file size properties to the Put processor
   - added the option to provide Hadoop configuration file in the Catalog Services
   - removed the V2 table format restriction since it felt unnecessary after looking into it more


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] AbdelrahimKA commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "AbdelrahimKA (via GitHub)" <gi...@apache.org>.
AbdelrahimKA commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1551567048

   Hi, @MohamedAdelHsn 
    Unfortunately, there is no way right now to send data directly to the open tables (Deltalake, Hudi or Iceberg) from Nifi. 
    I've tried this with other processes like putdatabaserecord. The putdatabaserecord have disabled autocommit feature so you cannot even use it with Trino or Dremio.
    The only way, which is not recommended is to use putsql process that needs some pre-processing steps before sending the data to Trinio or Dremio.
   I have opened a ticket on Jira ([Ticket-11449](https://issues.apache.org/jira/browse/NIFI-11449))hopefully someone will consider this.
   
   Hope this helps you.
   Regards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982309957


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    public static Namespace namespace = Namespace.of("test_metastore");
+
+    public static TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "date");
+
+    public static org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();

Review Comment:
   I removed every final keyword from the test classes except the static values  to be consistent. Is that ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r992174983


##########
nifi-assembly/pom.xml:
##########
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
                 </plugins>
             </build>
         </profile>
+        <profile>

Review Comment:
   Oh yes, this profile is about including the nars in the assembly. My bad. You are right! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1480803507

   It'll very likely remain like this. Many components are just provided as NARs via Maven Central. We have to keep the binary size as small as possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1253722349

   @mark-bathori Thanks for the latest changes! I set my review comments resolved and also tested the file format property and V1 tables.
   
   I looked into the data conversion code and now there are 3 separate implementations for Avro, Parquet and ORC conversions using the "low level" Iceberg API to write these data files.
   However, in the Iceberg API there also exists `GenericRecord` implementation that could be used for the conversion. So we would convert NiFi's `Record` object to Iceberg's `GenericRecord` object once, and then Iceberg would convert `GenericRecord` to Avro, Parquet and ORC  because it already knows how to do that.
   It would mean a more robust and maintainable code on our side.
   Do you think it makes sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] abdelrahim-ahmad commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "abdelrahim-ahmad (via GitHub)" <gi...@apache.org>.
abdelrahim-ahmad commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1480364703

   Thank you very much, I was not aware of the NARs project. Is there any chance for this to be included in the upcoming versions by default? or It will stays in NARs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] quydx commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "quydx (via GitHub)" <gi...@apache.org>.
quydx commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1709400059

   > @AbdelrahimKA The base nar files don't contain any cloud specific dependency due their sizes. You need to make a custom build from nifi-iceberg-bundle using either **include-hadoop-aws** or **include-hadoop-cloud-storage** profiles to be able to use the processor with S3. The **include-hadoop-aws** profile contains only S3 specific dependencies while **include-hadoop-cloud-storage** profile will additionally include azure and gcp related dependencies.
   
   Could you please give more details about this? How can we build the NAR file from nifi-iceberg-bundle using either **include-hadoop-aws** profiles. Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1480063769

   Those NARs are not part of the convenience binary due to ASF rules around the binary size. You can download NARs from Maven central repos and drop those in NiFi:
   https://mvnrepository.com/artifact/org.apache.nifi/nifi-iceberg-processors-nar
   https://mvnrepository.com/artifact/org.apache.nifi/nifi-iceberg-services-nar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982318544


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = (Types.TimestampType) type;
+                        if (timestampType.shouldAdjustToUTC()) {
+                            return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.TimestampConverter.INSTANCE;
+                    case UUID:
+                        UUIDDataType uuidType = (UUIDDataType) dataType;
+                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                            return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case FIXED:
+                        Types.FixedType fixedType = (Types.FixedType) type;
+                        return new GenericDataConverters.FixedConverter(fixedType.length());
+                    case BINARY:
+                        return GenericDataConverters.BinaryConverter.INSTANCE;
+                    case DECIMAL:
+                        Types.DecimalType decimalType = (Types.DecimalType) type;
+                        return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
+                    default:
+                        throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
+                }
+            }
+            return null;

Review Comment:
   The original method that is overridden returns with null by default, so that's why I followed this approach when `PrimitiveType` doesn't have a type id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982217966


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. " +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile.")
+})
+public class PutIceberg extends AbstractIcebergProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog Namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Table Name")
+            .description("The name of the table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder()
+            .name("file-format")
+            .displayName("File Format")
+            .description("File format to use when writing Iceberg data files. If not set the 'write.format.default' table property will be used, default value is parquet.")
+            .allowableValues(
+                    new AllowableValue("AVRO"),
+                    new AllowableValue("PARQUET"),
+                    new AllowableValue("ORC"))
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor TARGET_FILE_SIZE = new PropertyDescriptor.Builder()
+            .name("target-file-size")
+            .displayName("Target File Size")
+            .description("Controls the size of files generated to target about this many bytes. If not set the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.")
+            .addValidator(StandardValidators.LONG_VALIDATOR)
+            .required(false)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            CATALOG_NAMESPACE,
+            TABLE_NAME,
+            FILE_FORMAT,
+            TARGET_FILE_SIZE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final String fileFormat = context.getProperty(FILE_FORMAT).evaluateAttributeExpressions().getValue();
+        final String targetFileSize = context.getProperty(TARGET_FILE_SIZE).evaluateAttributeExpressions().getValue();
+
+        Table table;
+
+        try {
+            table = loadTable(context);
+        } catch (Exception e) {
+            getLogger().error("Failed to load table from catalog", e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final FileFormat format = getFileFormat(table.properties(), fileFormat);

Review Comment:
   What happens when this method throws an exception? In a more severe scenario (failed to load table), the flow file will be routed to failure. In this situation, a process exception will be thrown when this method is run as a privileged action, and if not, the original exception will penalize the processor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r991413169


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws IOException {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   Good to know, thanks for the info!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982311993


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws IOException {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   The `getUgiForKerberosUser` method in the `AbstractIcebergProcessor` checks the kerberos tgt and relogs the user if neccesary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r992154971


##########
nifi-assembly/pom.xml:
##########
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
                 </plugins>
             </build>
         </profile>
+        <profile>

Review Comment:
   @nandorsoma My understanding is that the profile is about the assembly only. So which nars go into the nifi bin.zip. Iceberg nar bundle is built regardless of this profile and the tests should run.
   Can you please confirm it @mark-bathori? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6368: NIFI-10442: Create PutIceberg processor
URL: https://github.com/apache/nifi/pull/6368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] MohamedAdelHsn commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "MohamedAdelHsn (via GitHub)" <gi...@apache.org>.
MohamedAdelHsn commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1543468631

   @AbdelrahimKA Hi i hope you are doing well, Did you find any solution for that issue?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r968617495


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);

Review Comment:
   `IcebergFileCommitter` has a single short method and does not have state. The method could be moved here to the processor class (e.g. `saveTable()`).



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {

Review Comment:
   Please rename it to `AbstractIcebergProcessor` because additional (non-kerberos) logic may be added in the future.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergTaskWriterFactory.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.nifi.processors.iceberg.appender.IcebergFileAppenderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Locale;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
+/**
+ * Factory class to create the suitable {@link TaskWriter} based on the {@link Table}'s properties
+ */
+public class IcebergTaskWriterFactory {
+    private final Table table;
+    private final Schema schema;
+    private final PartitionSpec spec;
+    private final FileIO io;
+    private long targetFileSize;
+    private FileFormat format;
+
+    private FileAppenderFactory<Record> appenderFactory;
+    private OutputFileFactory outputFileFactory;
+    private RecordSchema recordSchema;
+
+    public IcebergTaskWriterFactory(Table table, RecordSchema recordSchema) {
+        this.table = table;
+        this.schema = table.schema();
+        this.spec = table.spec();
+        this.io = table.io();
+        this.recordSchema = recordSchema;
+    }
+
+    public void initialize(int partitionId, long attemptId) {

Review Comment:
   `partioinId` does not seem to be necessary because the caller code calculates it from `table` which is available here.
   
   `attemptId` => `taskId`



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }

Review Comment:
   ```suggestion
                   if (taskWriter != null) {
                       taskWriter.abort();
                   }
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());

Review Comment:
   `IcebergTaskWriterFactory`'s constructor and `initialize()` could be merged (at least from the client's point of view, `initialize()` can still be a seaparate private method in its class)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r965057634


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Catalog service implementation that can use HDFS or similar file systems that support atomic rename.
+ */
+public class HadoopCatalogService extends AbstractControllerService implements IcebergCatalogService {
+
+    static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder()
+            .name("warehouse-path")
+            .displayName("Warehouse path")

Review Comment:
   Please use title case:
   ```suggestion
               .displayName("Warehouse Path")
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"iceberg", "catalog", "service"})
+@CapabilityDescription("Provides a basic connector to Iceberg catalog services.")

Review Comment:
   These annotations must be specified on the CS implementation class, not on the CS interface.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")

Review Comment:
   Typo: missing spaces where the string is concatenated



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   Duplicated dependency. As far as I can see, `nifi-avro-record-utils` is needed only for tests so this occurrence should be deleted.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/NifiRecordWrapper.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.StructLike;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+/**
+ * Class to wrap and adapt {@link Record} to Iceberg {@link StructLike} for partition handling usage like {@link
+ * org.apache.iceberg.PartitionKey#partition(StructLike)}
+ */
+public class NifiRecordWrapper implements StructLike {

Review Comment:
   ```suggestion
   public class NiFiRecordWrapper implements StructLike {
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `*-services-api` dependencies typically come from the parent `*-services-api-nar` and should be `provided` in the processor module. This one is declared as `provided` in `nifi-nar-bundles` pom so the scope should be omitted here.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-record-serialization-service-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `compile` is the default scope and does not need to be declared explicitly.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-utils</artifactId>
               <version>1.18.0-SNAPSHOT</version>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>

Review Comment:
   It does not seem to be needed at compile time and it will be provided in nifi's `lib` directory at runtime. So it can be removed.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>

Review Comment:
   Could you please separate prod and test dependencies into two groups? First prod ones, then tests.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")

Review Comment:
   ```suggestion
               .displayName("Catalog Namespace")
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `nifi-api` is always provided in nifi's `lib` directory. The `provided` scope declared in `nifi-nar-bundles` pom (`dependencyManagement` section) in and it should not be changed:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")

Review Comment:
   Iceberg prefix is not needed:
   ```suggestion
               .displayName("Table Name")
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `nifi-record` is also `provided` (declared in `nifi-nar-bundles` pom)
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-record</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-kerberos-user-service-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos-api</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-security-kerberos-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `compile` is the default scope and does not need to be declared explicitly.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-security-kerberos</artifactId>
               <version>1.18.0-SNAPSHOT</version>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,

Review Comment:
   I'd suggest the following order:
   - CATALOG
   - CATALOG_NAMESPACE
   - TABLE_NAME
   
   If I understand it correctly, the namespace belongs to the catalog and is used as a prefix for the table name.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml:
##########
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-iceberg-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>3.1.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-llap-tez</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-services-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-iceberg-services-api</artifactId>
               <version>1.18.0-SNAPSHOT</version>
               <scope>provided</scope>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);

Review Comment:
   This validation logic involves some client-server communication and it would rather fit into `VerifiableProcessor.verify()` method.
   Pls. see more on "validation" vs "verification" in `VerifiableProcessor`'s [documentation](https://github.com/apache/nifi/blob/5b565679dfca1d1fb1134a9a3c819c11587fef40/nifi-api/src/main/java/org/apache/nifi/processor/VerifiableProcessor.java#L26-L63).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982306163


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information." +
+        "It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 'write.format.default' table property, if it is not provided then parquet format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && validationContext.getProperty(CATALOG_NAMESPACE).isSet() && validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new ValidationResult.Builder().explanation("The provided table has incompatible table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }
+            } catch (IOException ex) {
+                throw new ProcessException("Failed to abort uncommitted data files.", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+
+    private Table initializeTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
+
+        return catalog.loadTable(tableIdentifier);

Review Comment:
   There is only 1 call in the current version since the table version validation was removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r991430356


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.createFieldGetter;
+
+/**
+ * Data converter implementations for different data types.
+ */
+public class GenericDataConverters {
+
+    static class SameTypeConverter implements DataConverter<Object, Object> {
+
+        static final SameTypeConverter INSTANCE = new SameTypeConverter();
+
+        @Override
+        public Object convert(Object data) {
+            return data;
+        }
+    }
+
+    static class TimeConverter implements DataConverter<Time, LocalTime> {
+
+        static final TimeConverter INSTANCE = new TimeConverter();
+
+        @Override
+        public LocalTime convert(Time data) {
+            return data.toLocalTime();
+        }
+    }
+
+    static class TimestampConverter implements DataConverter<Timestamp, LocalDateTime> {
+
+        static final TimestampConverter INSTANCE = new TimestampConverter();
+
+        @Override
+        public LocalDateTime convert(Timestamp data) {
+            return data.toLocalDateTime();
+        }
+    }
+
+    static class TimestampWithTimezoneConverter implements DataConverter<Timestamp, OffsetDateTime> {
+
+        static final TimestampWithTimezoneConverter INSTANCE = new TimestampWithTimezoneConverter();
+
+        @Override
+        public OffsetDateTime convert(Timestamp data) {
+            return OffsetDateTime.ofInstant(data.toInstant(), ZoneId.of("UTC"));
+        }
+    }
+
+    static class UUIDtoByteArrayConverter implements DataConverter<UUID, byte[]> {
+
+        static final UUIDtoByteArrayConverter INSTANCE = new UUIDtoByteArrayConverter();
+
+        @Override
+        public byte[] convert(UUID data) {
+            ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+            byteBuffer.putLong(data.getMostSignificantBits());
+            byteBuffer.putLong(data.getLeastSignificantBits());
+            return byteBuffer.array();
+        }
+    }
+
+    static class FixedConverter implements DataConverter<Byte[], byte[]> {
+
+        private final int length;
+
+        FixedConverter(int length) {
+            this.length = length;
+        }
+
+        @Override
+        public byte[] convert(Byte[] data) {
+            Validate.isTrue(data.length == length, String.format("Cannot write byte array of length %s as fixed[%s]", data.length, length));
+            return ArrayUtils.toPrimitive(data);
+        }
+    }
+
+    static class BinaryConverter implements DataConverter<Byte[], ByteBuffer> {
+
+        static final BinaryConverter INSTANCE = new BinaryConverter();
+
+        @Override
+        public ByteBuffer convert(Byte[] data) {
+            return ByteBuffer.wrap(ArrayUtils.toPrimitive(data));
+        }
+    }
+
+    static class BigDecimalConverter implements DataConverter<BigDecimal, BigDecimal> {
+
+        private final int precision;
+        private final int scale;
+
+        BigDecimalConverter(int precision, int scale) {
+            this.precision = precision;
+            this.scale = scale;
+        }
+
+        @Override
+        public BigDecimal convert(BigDecimal data) {
+            Validate.isTrue(data.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data);
+            Validate.isTrue(data.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data);
+            return data;
+        }
+    }
+
+    static class ArrayConverter<T, S> implements DataConverter<T[], List<S>> {
+        private final DataConverter<T, S> fieldConverter;
+        private final ArrayElementGetter.ElementGetter elementGetter;
+
+        ArrayConverter(DataConverter<T, S> elementConverter, DataType dataType) {
+            this.fieldConverter = elementConverter;
+            this.elementGetter = ArrayElementGetter.createElementGetter(dataType);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<S> convert(T[] data) {
+            final int numElements = data.length;
+            List<S> result = new ArrayList<>(numElements);
+            for (int i = 0; i < numElements; i += 1) {
+                result.add(i, fieldConverter.convert((T) elementGetter.getElementOrNull(data, i)));
+            }
+            return result;
+        }
+    }
+
+    static class MapConverter<K, V, L, B> implements DataConverter<Map<K, V>, Map<L, B>> {
+        private final DataConverter<K, L> keyConverter;
+        private final DataConverter<V, B> valueConverter;
+        private final ArrayElementGetter.ElementGetter keyGetter;
+        private final ArrayElementGetter.ElementGetter valueGetter;
+
+        MapConverter(DataConverter<K, L> keyConverter, DataType keyType, DataConverter<V, B> valueConverter, DataType valueType) {
+            this.keyConverter = keyConverter;
+            this.keyGetter = ArrayElementGetter.createElementGetter(keyType);
+            this.valueConverter = valueConverter;
+            this.valueGetter = ArrayElementGetter.createElementGetter(valueType);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Map<L, B> convert(Map<K, V> data) {
+            final int mapSize = data.size();
+            final Object[] keyArray = data.keySet().toArray();
+            final Object[] valueArray = data.values().toArray();
+            Map<L, B> result = new HashMap<>(mapSize);
+            for (int i = 0; i < mapSize; i += 1) {
+                result.put(keyConverter.convert((K) keyGetter.getElementOrNull(keyArray, i)), valueConverter.convert((V) valueGetter.getElementOrNull(valueArray, i)));
+            }
+
+            return result;
+        }
+    }
+
+    static class RecordConverter implements DataConverter<Record, GenericRecord> {
+
+        private final DataConverter<?, ?>[] converters;
+        private final RecordFieldGetter.FieldGetter[] getters;
+
+        private final Types.StructType schema;
+
+        RecordConverter(List<DataConverter<?, ?>> converters, List<RecordField> recordFields, Types.StructType schema) {
+            this.schema = schema;
+            this.converters = (DataConverter<?, ?>[]) Array.newInstance(DataConverter.class, converters.size());
+            this.getters = new RecordFieldGetter.FieldGetter[converters.size()];
+            for (int i = 0; i < converters.size(); i += 1) {
+                final RecordField recordField = recordFields.get(i);
+                this.converters[i] = converters.get(i);
+                this.getters[i] = createFieldGetter(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
+            }
+        }
+
+        @Override
+        public GenericRecord convert(Record data) {
+            final GenericRecord template = GenericRecord.create(schema);
+            // GenericRecord.copy() is more performant then GenericRecord.create(StructType) since NAME_MAP_CACHE access is eliminated. Using copy here to gain performance.
+            GenericRecord result = template.copy();

Review Comment:
   missing final



##########
nifi-assembly/pom.xml:
##########
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
                 </plugins>
             </build>
         </profile>
+        <profile>

Review Comment:
   I understand that we need a separate profile for Iceberg because the resulting nars became too big. However, this means that none of the GitHub CI jobs will include it, which is risky. It would probably be overkill to add it to all CI jobs, but is it possible to run tests in one of the CI jobs? I'm okay with handling it in a separate pr or discussing it. @turcsanyip 



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+import static org.apache.nifi.processors.iceberg.PutIceberg.REL_FAILURE;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+    private volatile UserGroupInformation ugi;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+            try {
+                this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser);
+            } catch (IOException e) {
+                throw new ProcessException("Kerberos Authentication failed", e);
+            }
+        }
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+            } catch (KerberosLoginException e) {
+                getLogger().error("Error logging out kerberos user", e);
+            } finally {
+                kerberosUser = null;
+                ugi = null;
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();

Review Comment:
   missing final



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = (Types.TimestampType) type;
+                        if (timestampType.shouldAdjustToUTC()) {
+                            return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.TimestampConverter.INSTANCE;
+                    case UUID:
+                        UUIDDataType uuidType = (UUIDDataType) dataType;
+                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                            return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+                        }
+                        return GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case FIXED:
+                        Types.FixedType fixedType = (Types.FixedType) type;
+                        return new GenericDataConverters.FixedConverter(fixedType.length());
+                    case BINARY:
+                        return GenericDataConverters.BinaryConverter.INSTANCE;
+                    case DECIMAL:
+                        Types.DecimalType decimalType = (Types.DecimalType) type;
+                        return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
+                    default:
+                        throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
+                }
+            }
+            return null;

Review Comment:
   I'm still uncertain about this part. I think we should handle the same way a non existent and an unknown type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r992124333


##########
nifi-nar-bundles/nifi-iceberg-bundle/pom.xml:
##########
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>

Review Comment:
   @mark-bathori Please update the version to 1.19.0-SNAPSHOT and rebase your branch onto main.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by GitBox <gi...@apache.org>.
mark-bathori commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r992252822


##########
nifi-assembly/pom.xml:
##########
@@ -1528,5 +1528,31 @@ language governing permissions and limitations under the License. -->
                 </plugins>
             </build>
         </profile>
+        <profile>

Review Comment:
   Yes I can see iceberg tests executed after adding the profile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on pull request #6368: NIFI-10442: Create PutIceberg processor

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on PR #6368:
URL: https://github.com/apache/nifi/pull/6368#issuecomment-1502175258

   @AbdelrahimKA The base nar files don't contain any cloud specific dependency due their sizes.
   You need to make a custom build from nifi-iceberg-bundle using either **include-hadoop-aws** or **include-hadoop-cloud-storage** profiles to be able to use the processor with S3. The **include-hadoop-aws** profile contains only S3 specific dependencies while **include-hadoop-cloud-storage** profile will additionally include azure and gcp related dependencies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org