You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/02/15 17:33:37 UTC
[nifi] branch main updated: NIFI-11149 Added PutRedisHashRecord Processor
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9c5ae0d098 NIFI-11149 Added PutRedisHashRecord Processor
9c5ae0d098 is described below
commit 9c5ae0d098d2d8bebf7042a0b7ab353c7a2aa347
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Feb 10 16:27:02 2023 -0500
NIFI-11149 Added PutRedisHashRecord Processor
This closes #6943
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi-redis-extensions/pom.xml | 20 ++
.../nifi/redis/processor/PutRedisHashRecord.java | 281 +++++++++++++++++++++
.../RedisDistributedMapCacheClientService.java | 20 +-
.../org/apache/nifi/redis/util/RedisUtils.java | 20 ++
.../services/org.apache.nifi.processor.Processor | 15 ++
.../redis/processor/TestPutRedisHashRecord.java | 217 ++++++++++++++++
.../ITRedisDistributedMapCacheClientService.java | 5 +-
7 files changed, 559 insertions(+), 19 deletions(-)
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
index f899b7e151..80baf8d1d2 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
@@ -68,6 +68,20 @@
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -75,6 +89,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
new file mode 100644
index 0000000000..ea11e9ebbe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified hash value, which is determined by a RecordPath to a field in each record containing the hash value. The record fields "
+ + "and values are stored as key/value pairs associated by the hash value. NOTE: Neither the evaluated hash value nor any of the field values can be null. If the hash value is null, "
+ + "the FlowFile will be routed to failure. For each of the field values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = PutRedisHashRecord.SUCCESS_RECORD_COUNT,
+ description = "Number of records written to Redis")
+})
+public class PutRedisHashRecord extends AbstractProcessor {
+
+ public static final String SUCCESS_RECORD_COUNT = "redis.success.record.count";
+
+ protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("hash-value-record-path")
+ .displayName("Hash Value Record Path")
+ .description("Specifies a RecordPath to evaluate against each Record in order to determine the hash value associated with all the record fields/values "
+ + "(see 'hset' in Redis documentation for more details). The RecordPath must point at exactly one field or an error will occur.")
+ .required(true)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(NONE)
+ .build();
+
+ static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("data-record-path")
+ .displayName("Data Record Path")
+ .description("This property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
+ " Redis instead of sending the entire incoming Record. The property defaults to the root '/' which corresponds to a 'flat' record (all fields/values at the top level of " +
+ " the Record.")
+ .required(true)
+ .addValidator(new RecordPathValidator())
+ .defaultValue("/")
+ .expressionLanguageSupported(NONE)
+ .build();
+
+ static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+ .name("charset")
+ .displayName("Character Set")
+ .description("Specifies the character set to use when storing record field values as strings. All fields will be converted to strings using this character set "
+ + "before being stored in Redis.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles having all Records stored in Redis will be routed to this relationship")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles containing Records with processing errors will be routed to this relationship")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+ private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new LinkedHashSet<>(Arrays.asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RECORD_READER_FACTORY);
+ props.add(REDIS_CONNECTION_POOL);
+ props.add(HASH_VALUE_RECORD_PATH);
+ props.add(DATA_RECORD_PATH);
+ props.add(CHARSET);
+ PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ private volatile RedisConnectionPool redisConnectionPool;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ }
+
+ @OnStopped
+ public void onStopped() {
+ this.redisConnectionPool = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+ long count = 0;
+
+ try (InputStream in = session.read(flowFile);
+ RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+ RedisConnection redisConnection = redisConnectionPool.getConnection()) {
+
+ final String hashValueRecordPathValue = context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+ final RecordPath hashValueRecordPath = RecordPath.compile(hashValueRecordPathValue);
+
+ final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
+ final RecordPath dataRecordPath = RecordPath.compile(dataRecordPathValue);
+
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+ Record record;
+
+ while ((record = reader.nextRecord()) != null) {
+ final RecordPathResult recordPathResult = hashValueRecordPath.evaluate(record);
+ final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+ if (resultList.isEmpty()) {
+ throw new ProcessException(String.format("No results found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
+ }
+
+ if (resultList.size() > 1) {
+ throw new ProcessException(String.format("Multiple results [%d] found for Record [%d] Hash Value Record Path: %s", resultList.size(), count, hashValueRecordPath.getPath()));
+ }
+
+ final FieldValue hashValueFieldValue = resultList.get(0);
+ final Object hashValueObject = hashValueFieldValue.getValue();
+ if (hashValueObject == null) {
+ throw new ProcessException(String.format("Null value found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
+ }
+ final String hashValue = (String) DataTypeUtils.convertType(hashValueObject, RecordFieldType.STRING.getDataType(), charset.name());
+
+ List<Record> dataRecords = getDataRecords(dataRecordPath, record);
+
+ count = putDataRecordsToRedis(dataRecords, redisConnection, hashValue, charset, count);
+ }
+
+ } catch (MalformedRecordException e) {
+ getLogger().error("Read Records failed {}", flowFile, e);
+ flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Record Schema not found {}", flowFile, e);
+ flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } catch (Exception e) {
+ getLogger().error("Put Records failed {}", flowFile, e);
+ flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ private List<Record> getDataRecords(final RecordPath dataRecordPath, final Record outerRecord) {
+ if (dataRecordPath == null) {
+ return Collections.singletonList(outerRecord);
+ }
+
+ final RecordPathResult result = dataRecordPath.evaluate(outerRecord);
+ final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
+ if (fieldValues.isEmpty()) {
+ throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
+ }
+
+ for (final FieldValue fieldValue : fieldValues) {
+ final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+ if (fieldType != RecordFieldType.RECORD) {
+ throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
+ " " + fieldType);
+ }
+ }
+
+ final List<Record> dataRecords = new ArrayList<>(fieldValues.size());
+ for (final FieldValue fieldValue : fieldValues) {
+ dataRecords.add((Record) fieldValue.getValue());
+ }
+
+ return dataRecords;
+ }
+
+ private long putDataRecordsToRedis(final List<Record> dataRecords, final RedisConnection redisConnection, final String hashValue, final Charset charset, final long originalCount) {
+ long count = originalCount;
+ for (Record dataRecord : dataRecords) {
+ RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+ for (RecordField recordField : dataRecordSchema.getFields()) {
+ final String fieldName = recordField.getFieldName();
+ final Object value = dataRecord.getValue(fieldName);
+ if (fieldName == null || value == null) {
+ getLogger().debug("Record field missing required elements: name [{}] value [{}]", fieldName, value);
+ } else {
+ final String stringValue = (String) DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), charset.name());
+ redisConnection.hashCommands().hSet(hashValue.getBytes(charset), fieldName.getBytes(charset), stringValue.getBytes(charset));
+ }
+ }
+ count++;
+ }
+ return count;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 7fd6c9b8b0..6a36d485c5 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -29,7 +29,6 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
@@ -51,6 +50,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+import static org.apache.nifi.redis.util.RedisUtils.TTL;
+
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
"the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
@@ -58,22 +60,6 @@ import java.util.concurrent.TimeUnit;
"provide high-availability configurations.")
public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
- public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
- .name("redis-connection-pool")
- .displayName("Redis Connection Pool")
- .identifiesControllerService(RedisConnectionPool.class)
- .required(true)
- .build();
-
- public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
- .name("redis-cache-ttl")
- .displayName("TTL")
- .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .required(true)
- .defaultValue("0 secs")
- .build();
-
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index dd0197726a..af9b61e7a0 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -24,6 +24,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.util.StringUtils;
@@ -46,6 +47,25 @@ import java.util.concurrent.TimeUnit;
public class RedisUtils {
+ // These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool
+
+ public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
+ .name("redis-connection-pool")
+ .displayName("Redis Connection Pool")
+ .identifiesControllerService(RedisConnectionPool.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+ .name("redis-cache-ttl")
+ .displayName("TTL")
+ .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .defaultValue("0 secs")
+ .build();
+
+
// These properties are shared between the connection pool controller service and the state provider, the name
// is purposely set to be more human-readable since that will be referenced in state-management.xml
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..fbf989625e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.redis.processor.PutRedisHashRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java
new file mode 100644
index 0000000000..12fd6b198a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisHashRecord.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisHashRecord {
+
+ private TestRunner runner;
+ private PutRedisHashRecord processor;
+ private MockRecordParser parser;
+
+ private MockRedisConnectionPoolService connectionPoolService;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ processor = new PutRedisHashRecord();
+ runner = TestRunners.newTestRunner(processor);
+ parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.setProperty(PutRedisHashRecord.RECORD_READER_FACTORY, "parser");
+
+ connectionPoolService = new MockRedisConnectionPoolService();
+ connectionPoolService.setFailAfterN(0);
+ try {
+ runner.addControllerService("connectionPool", connectionPoolService);
+ } catch (InitializationException e) {
+ throw new IOException(e);
+ }
+ runner.setProperty(connectionPoolService, RedisUtils.CONNECTION_STRING, "localhost:6379");
+ runner.setProperty(RedisUtils.REDIS_CONNECTION_POOL, "connectionPool");
+ // Tests should provide a field named 'hash' with unique values per record, unless testing failure conditions
+ runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/hash");
+ }
+
+ @Test
+ public void testPutRecords() {
+ runner.assertNotValid();
+ runner.enableControllerService(connectionPoolService);
+ parser.addSchemaField("hash", RecordFieldType.STRING);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+
+ parser.addRecord("abc", "John Doe", 48);
+ parser.addRecord("def", "Jane Doe", 47);
+ parser.addRecord("ghi", "Jimmy Doe", 14);
+
+ runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
+ assertEquals(1, result.size());
+ MockFlowFile ff = result.get(0);
+ ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "3");
+ // Verify the content is untouched
+ ff.assertContentEquals("hello");
+ }
+
+ @Test
+ public void testPutRecordsFailAfterN() {
+ runner.assertNotValid();
+ connectionPoolService.setFailAfterN(5);
+ runner.enableControllerService(connectionPoolService);
+ parser.addSchemaField("hash", RecordFieldType.STRING);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+
+ parser.addRecord("abc", "John Doe", 48);
+ parser.addRecord("def", "Jane Doe", 47);
+ parser.addRecord("ghi", "Jimmy Doe", 14);
+
+ runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
+ assertEquals(1, result.size());
+ MockFlowFile ff = result.get(0);
+ ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "1");
+ // Verify the content is untouched
+ ff.assertContentEquals("hello");
+ }
+
+ @Test
+ public void testPutRecordsNoHash() {
+ runner.assertNotValid();
+ runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/invalid_path");
+ runner.assertValid(connectionPoolService);
+ runner.enableControllerService(connectionPoolService);
+ parser.addSchemaField("hash", RecordFieldType.STRING);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+
+ parser.addRecord("abc", "John Doe", 48);
+
+ runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
+ assertEquals(1, result.size());
+ MockFlowFile ff = result.get(0);
+ ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "0");
+ // Verify the content is untouched
+ ff.assertContentEquals("hello");
+ }
+
+ @Test
+ public void testPutRecordsNullValue() {
+ runner.assertNotValid();
+ runner.enableControllerService(connectionPoolService);
+ parser.addSchemaField("hash", RecordFieldType.STRING);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+
+ parser.addRecord("abc", "John Doe", 48);
+ parser.addRecord("def", "Jane Doe", null);
+
+ runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ // FlowFile should be routed to success but with only one record
+ runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
+ assertEquals(1, result.size());
+ MockFlowFile ff = result.get(0);
+ // Both records are transferred successfully, but the null value was not put into Redis
+ ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "2");
+ // Verify the content is untouched
+ ff.assertContentEquals("hello");
+ }
+
+ private static class MockRedisConnectionPoolService extends RedisConnectionPoolService {
+
+ private final Map<String, Map<String, String>> hashStore = new HashMap<>();
+ private int failAfterN = 0;
+ private int currentFailures = 0;
+
+ @Override
+ public RedisConnection getConnection() {
+ currentFailures = 0;
+ RedisConnection mockRedisConnection = mock(RedisConnection.class);
+ RedisHashCommands hashCommands = mock(RedisHashCommands.class);
+ when(hashCommands.hSet(any(byte[].class), any(byte[].class), any(byte[].class))).thenAnswer((Answer<Boolean>) invocationOnMock -> {
+ currentFailures++;
+ if (failAfterN > 0 && currentFailures > failAfterN) {
+ throw new UncheckedIOException(new IOException("error during hset"));
+ }
+ final byte[] hashValue = invocationOnMock.getArgument(0);
+ final byte[] keyValue = invocationOnMock.getArgument(1);
+ final byte[] valueValue = invocationOnMock.getArgument(2);
+
+ if (hashValue == null || keyValue == null || valueValue == null) {
+ throw new NullPointerException("hash, key, and value must not be null");
+ }
+
+ final String hashString = new String(hashValue);
+
+ Map<String, String> kvMap = hashStore.get(hashString);
+ if (kvMap == null) {
+ kvMap = new HashMap<>();
+ }
+ kvMap.put(new String(keyValue), new String(valueValue));
+ hashStore.put(hashString, kvMap);
+
+ return Boolean.TRUE;
+ });
+ when(mockRedisConnection.hashCommands()).thenReturn(hashCommands);
+ return mockRedisConnection;
+ }
+
+ public void setFailAfterN(int triesBeforeFailure) {
+ failAfterN = Math.max(triesBeforeFailure, 0);
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
index 191930c7de..aec018954e 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -125,7 +126,7 @@ public class ITRedisDistributedMapCacheClientService {
// create, configure, and enable the RedisDistributedMapCacheClient service
redisMapCacheClientService = new RedisDistributedMapCacheClientService();
testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService);
- testRunner.setProperty(redisMapCacheClientService, RedisDistributedMapCacheClientService.REDIS_CONNECTION_POOL, "redis-connection-pool");
+ testRunner.setProperty(redisMapCacheClientService, REDIS_CONNECTION_POOL, "redis-connection-pool");
testRunner.enableControllerService(redisMapCacheClientService);
testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client");
}
@@ -182,7 +183,7 @@ public class ITRedisDistributedMapCacheClientService {
final String key = "test-redis-processor-" + timestamp;
final String value = "the time is " + timestamp;
- // verify the key doesn't exists, put the key/value, then verify it exists
+ // verify the key doesn't exist, put the key/value, then verify it exists
assertFalse(cacheClient.containsKey(key, stringSerializer));
cacheClient.put(key, value, stringSerializer, stringSerializer);
assertTrue(cacheClient.containsKey(key, stringSerializer));