You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/02/15 17:33:37 UTC

[nifi] branch main updated: NIFI-11149 Added PutRedisHashRecord Processor

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c5ae0d098 NIFI-11149 Added PutRedisHashRecord Processor
9c5ae0d098 is described below

commit 9c5ae0d098d2d8bebf7042a0b7ab353c7a2aa347
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Feb 10 16:27:02 2023 -0500

    NIFI-11149 Added PutRedisHashRecord Processor
    
    This closes #6943
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-redis-extensions/pom.xml                  |  20 ++
 .../nifi/redis/processor/PutRedisHashRecord.java   | 281 +++++++++++++++++++++
 .../RedisDistributedMapCacheClientService.java     |  20 +-
 .../org/apache/nifi/redis/util/RedisUtils.java     |  20 ++
 .../services/org.apache.nifi.processor.Processor   |  15 ++
 .../redis/processor/TestPutRedisHashRecord.java    | 217 ++++++++++++++++
 .../ITRedisDistributedMapCacheClientService.java   |   5 +-
 7 files changed, 559 insertions(+), 19 deletions(-)

diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
index f899b7e151..80baf8d1d2 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
@@ -68,6 +68,20 @@
             <artifactId>nifi-utils</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -75,6 +89,12 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.kstyrc</groupId>
             <artifactId>embedded-redis</artifactId>
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
new file mode 100644
index 0000000000..ea11e9ebbe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified hash value, which is determined by a RecordPath to a field in each record containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash value. NOTE: Neither the evaluated hash value nor any of the field values can be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = PutRedisHashRecord.SUCCESS_RECORD_COUNT,
+                description = "Number of records written to Redis")
+})
+public class PutRedisHashRecord extends AbstractProcessor {
+
+    public static final String SUCCESS_RECORD_COUNT = "redis.success.record.count";
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each Record in order to determine the hash value associated with all the record fields/values "
+                    + "(see 'hset' in Redis documentation for more details). The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The property defaults to the root '/' which corresponds to a 'flat' record (all fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing record field values as strings. All fields will be converted to strings using this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles having all Records stored in Redis will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles containing Records with processing errors will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new LinkedHashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        long count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             RedisConnection redisConnection = redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException(String.format("No results found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException(String.format("Multiple results [%d] found for Record [%d] Hash Value Record Path: %s", resultList.size(), count, hashValueRecordPath.getPath()));
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException(String.format("Null value found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
+                }
+                final String hashValue = (String) DataTypeUtils.convertType(hashValueObject, RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, record);
+
+                count = putDataRecordsToRedis(dataRecords, redisConnection, hashValue, charset, count);
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Read Records failed {}", flowFile, e);
+            flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Record Schema not found {}", flowFile, e);
+            flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (Exception e) {
+            getLogger().error("Put Records failed {}", flowFile, e);
+            flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private List<Record> getDataRecords(final RecordPath dataRecordPath, final Record outerRecord) {
+        if (dataRecordPath == null) {
+            return Collections.singletonList(outerRecord);
+        }
+
+        final RecordPathResult result = dataRecordPath.evaluate(outerRecord);
+        final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
+        if (fieldValues.isEmpty()) {
+            throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
+        }
+
+        for (final FieldValue fieldValue : fieldValues) {
+            final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+            if (fieldType != RecordFieldType.RECORD) {
+                throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
+                        " " + fieldType);
+            }
+        }
+
+        final List<Record> dataRecords = new ArrayList<>(fieldValues.size());
+        for (final FieldValue fieldValue : fieldValues) {
+            dataRecords.add((Record) fieldValue.getValue());
+        }
+
+        return dataRecords;
+    }
+
+    private long putDataRecordsToRedis(final List<Record> dataRecords, final RedisConnection redisConnection, final String hashValue, final Charset charset, final long originalCount) {
+        long count = originalCount;
+        for (Record dataRecord : dataRecords) {
+            RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+            for (RecordField recordField : dataRecordSchema.getFields()) {
+                final String fieldName = recordField.getFieldName();
+                final Object value = dataRecord.getValue(fieldName);
+                if (fieldName == null || value == null) {
+                    getLogger().debug("Record field missing required elements: name [{}] value [{}]", fieldName, value);
+                } else {
+                    final String stringValue = (String) DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), charset.name());
+                    redisConnection.hashCommands().hSet(hashValue.getBytes(charset), fieldName.getBytes(charset), stringValue.getBytes(charset));
+                }
+            }
+            count++;
+        }
+        return count;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 7fd6c9b8b0..6a36d485c5 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -29,7 +29,6 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
 import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.redis.util.RedisAction;
@@ -51,6 +50,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+import static org.apache.nifi.redis.util.RedisUtils.TTL;
+
 @Tags({ "redis", "distributed", "cache", "map" })
 @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
         "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
@@ -58,22 +60,6 @@ import java.util.concurrent.TimeUnit;
         "provide high-availability configurations.")
 public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
 
-    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
-            .name("redis-connection-pool")
-            .displayName("Redis Connection Pool")
-            .identifiesControllerService(RedisConnectionPool.class)
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
-            .name("redis-cache-ttl")
-            .displayName("TTL")
-            .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .required(true)
-            .defaultValue("0 secs")
-            .build();
-
     static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index dd0197726a..af9b61e7a0 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -24,6 +24,7 @@ import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.util.StringUtils;
@@ -46,6 +47,25 @@ import java.util.concurrent.TimeUnit;
 
 public class RedisUtils {
 
+    // These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool
+
+    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
+            .name("redis-connection-pool")
+            .displayName("Redis Connection Pool")
+            .identifiesControllerService(RedisConnectionPool.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("redis-cache-ttl")
+            .displayName("TTL")
+            .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .defaultValue("0 secs")
+            .build();
+
+
     // These properties are shared between the connection pool controller service and the state provider, the name
     // is purposely set to be more human-readable since that will be referenced in state-management.xml
 
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..fbf989625e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.redis.processor.PutRedisHashRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java
new file mode 100644
index 0000000000..12fd6b198a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisHashRecord {
+
+    private TestRunner runner;
+    private PutRedisHashRecord processor;
+    private MockRecordParser parser;
+
+    private MockRedisConnectionPoolService connectionPoolService;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        processor = new PutRedisHashRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.setProperty(PutRedisHashRecord.RECORD_READER_FACTORY, "parser");
+
+        connectionPoolService = new MockRedisConnectionPoolService();
+        connectionPoolService.setFailAfterN(0);
+        try {
+            runner.addControllerService("connectionPool", connectionPoolService);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.setProperty(connectionPoolService, RedisUtils.CONNECTION_STRING, "localhost:6379");
+        runner.setProperty(RedisUtils.REDIS_CONNECTION_POOL, "connectionPool");
+        // Tests should provide a field named 'hash' with unique values per record, unless testing failure conditions
+        runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/hash");
+    }
+
+    @Test
+    public void testPutRecords() {
+        runner.assertNotValid();
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+        parser.addRecord("def", "Jane Doe", 47);
+        parser.addRecord("ghi", "Jimmy Doe", 14);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
+        assertEquals(1, result.size());
+        MockFlowFile ff = result.get(0);
+        ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "3");
+        // Verify the content is untouched
+        ff.assertContentEquals("hello");
+    }
+
+    @Test
+    public void testPutRecordsFailAfterN() {
+        runner.assertNotValid();
+        connectionPoolService.setFailAfterN(5);
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+        parser.addRecord("def", "Jane Doe", 47);
+        parser.addRecord("ghi", "Jimmy Doe", 14);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
+        assertEquals(1, result.size());
+        MockFlowFile ff = result.get(0);
+        ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "1");
+        // Verify the content is untouched
+        ff.assertContentEquals("hello");
+    }
+
+    @Test
+    public void testPutRecordsNoHash() {
+        runner.assertNotValid();
+        runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/invalid_path");
+        runner.assertValid(connectionPoolService);
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
+        assertEquals(1, result.size());
+        MockFlowFile ff = result.get(0);
+        ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "0");
+        // Verify the content is untouched
+        ff.assertContentEquals("hello");
+    }
+
+    @Test
+    public void testPutRecordsNullValue() {
+        runner.assertNotValid();
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+        parser.addRecord("def", "Jane Doe", null);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        // FlowFile should be routed to success but with only one record
+        runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
+        assertEquals(1, result.size());
+        MockFlowFile ff = result.get(0);
+        // Both records are transferred successfully, but the null value was not put into Redis
+        ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "2");
+        // Verify the content is untouched
+        ff.assertContentEquals("hello");
+    }
+
+    private static class MockRedisConnectionPoolService extends RedisConnectionPoolService {
+
+        private final Map<String, Map<String, String>> hashStore = new HashMap<>();
+        private int failAfterN = 0;
+        private int currentFailures = 0;
+
+        @Override
+        public RedisConnection getConnection() {
+            currentFailures = 0;
+            RedisConnection mockRedisConnection = mock(RedisConnection.class);
+            RedisHashCommands hashCommands = mock(RedisHashCommands.class);
+            when(hashCommands.hSet(any(byte[].class), any(byte[].class), any(byte[].class))).thenAnswer((Answer<Boolean>) invocationOnMock -> {
+                currentFailures++;
+                if (failAfterN > 0 && currentFailures > failAfterN) {
+                    throw new UncheckedIOException(new IOException("error during hset"));
+                }
+                final byte[] hashValue = invocationOnMock.getArgument(0);
+                final byte[] keyValue = invocationOnMock.getArgument(1);
+                final byte[] valueValue = invocationOnMock.getArgument(2);
+
+                if (hashValue == null || keyValue == null || valueValue == null) {
+                    throw new NullPointerException("hash, key, and value must not be null");
+                }
+
+                final String hashString = new String(hashValue);
+
+                Map<String, String> kvMap = hashStore.get(hashString);
+                if (kvMap == null) {
+                    kvMap = new HashMap<>();
+                }
+                kvMap.put(new String(keyValue), new String(valueValue));
+                hashStore.put(hashString, kvMap);
+
+                return Boolean.TRUE;
+            });
+            when(mockRedisConnection.hashCommands()).thenReturn(hashCommands);
+            return mockRedisConnection;
+        }
+
+        public void setFailAfterN(int triesBeforeFailure) {
+            failAfterN = Math.max(triesBeforeFailure, 0);
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
index 191930c7de..aec018954e 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -125,7 +126,7 @@ public class ITRedisDistributedMapCacheClientService {
         // create, configure, and enable the RedisDistributedMapCacheClient service
         redisMapCacheClientService = new RedisDistributedMapCacheClientService();
         testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService);
-        testRunner.setProperty(redisMapCacheClientService, RedisDistributedMapCacheClientService.REDIS_CONNECTION_POOL, "redis-connection-pool");
+        testRunner.setProperty(redisMapCacheClientService, REDIS_CONNECTION_POOL, "redis-connection-pool");
         testRunner.enableControllerService(redisMapCacheClientService);
         testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client");
     }
@@ -182,7 +183,7 @@ public class ITRedisDistributedMapCacheClientService {
                 final String key = "test-redis-processor-" + timestamp;
                 final String value = "the time is " + timestamp;
 
-                // verify the key doesn't exists, put the key/value, then verify it exists
+                // verify the key doesn't exist, put the key/value, then verify it exists
                 assertFalse(cacheClient.containsKey(key, stringSerializer));
                 cacheClient.put(key, value, stringSerializer, stringSerializer);
                 assertTrue(cacheClient.containsKey(key, stringSerializer));