You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2023/01/25 22:10:54 UTC
[nifi] branch main updated: NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5101db2ab7 NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition
5101db2ab7 is described below
commit 5101db2ab736bab4f08acce90f7880f0cb06de78
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Jan 25 15:43:39 2023 -0600
NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition
- Deprecated Base64EncodeContent in favor of EncodeContent
- Updated EncodeContent to use Commons Codec for Hexadecimal encoding
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../processors/standard/Base64EncodeContent.java | 5 ++
.../nifi/processors/standard/EncodeContent.java | 100 +++++++++------------
.../services/org.apache.nifi.processor.Processor | 1 +
.../processors/standard/TestEncodeContent.java | 24 ++---
4 files changed, 63 insertions(+), 67 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index cdade756d7..74f06e1dce 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -54,6 +55,10 @@ import org.apache.nifi.util.StopWatch;
@Tags({"encode", "base64"})
@CapabilityDescription("Encodes or decodes content to and from base64")
@InputRequirement(Requirement.INPUT_REQUIRED)
+@DeprecationNotice(
+ alternatives = EncodeContent.class,
+ reason = "EncodeContent supports Base64 and additional encoding schemes"
+)
public class Base64EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
index 469e4ecd92..869605b7d2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
@@ -19,10 +19,9 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -33,7 +32,6 @@ import org.apache.commons.codec.binary.Base32OutputStream;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
-import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -42,11 +40,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream;
@@ -54,18 +50,16 @@ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"encode", "decode", "base64", "hex"})
-@CapabilityDescription("Encodes the FlowFile content in base64")
+@CapabilityDescription("Encode or decode contents using configurable encoding schemes")
public class EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";
public static final String DECODE_MODE = "Decode";
- // List of support encodings.
public static final String BASE64_ENCODING = "base64";
public static final String BASE32_ENCODING = "base32";
public static final String HEX_ENCODING = "hex";
@@ -95,21 +89,23 @@ public class EncodeContent extends AbstractProcessor {
.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
.build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private static final int BUFFER_SIZE = 8192;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(MODE);
- props.add(ENCODING);
- this.properties = Collections.unmodifiableList(props);
-
- final Set<Relationship> rels = new HashSet<>();
- rels.add(REL_SUCCESS);
- rels.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(rels);
- }
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+ Arrays.asList(
+ MODE,
+ ENCODING
+ )
+ );
+
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(
+ new LinkedHashSet<>(
+ Arrays.asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )
+ )
+ );
@Override
public Set<Relationship> getRelationships() {
@@ -128,45 +124,37 @@ public class EncodeContent extends AbstractProcessor {
return;
}
- final ComponentLog logger = getLogger();
+ final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+ final String encoding = context.getProperty(ENCODING).getValue();
+ final StreamCallback callback;
- boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
- String encoding = context.getProperty(ENCODING).getValue();
- StreamCallback encoder = null;
-
- // Select the encoder/decoder to use
if (encode) {
if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
- encoder = new EncodeBase64();
+ callback = new EncodeBase64();
} else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
- encoder = new EncodeBase32();
- } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
- encoder = new EncodeHex();
+ callback = new EncodeBase32();
+ } else {
+ callback = new EncodeHex();
}
} else {
if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
- encoder = new DecodeBase64();
+ callback = new DecodeBase64();
} else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
- encoder = new DecodeBase32();
- } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
- encoder = new DecodeHex();
+ callback = new DecodeBase32();
+ } else {
+ callback = new DecodeHex();
}
}
- if (encoder == null) {
- logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode", encoding});
- return;
- }
-
try {
final StopWatch stopWatch = new StopWatch(true);
- flowFile = session.write(flowFile, encoder);
+ flowFile = session.write(flowFile, callback);
- logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
+ getLogger().info("{} completed {}", encode ? "Encoding" : "Decoding", flowFile);
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Exception e) {
- logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
+ } catch (final Exception e) {
+ getLogger().error("{} failed {}", encode ? "Encoding" : "Decoding", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
}
}
@@ -174,7 +162,7 @@ public class EncodeContent extends AbstractProcessor {
private static class EncodeBase64 implements StreamCallback {
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base64OutputStream bos = new Base64OutputStream(out)) {
StreamUtils.copy(in, bos);
}
@@ -184,7 +172,7 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeBase64 implements StreamCallback {
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
StreamUtils.copy(bis, out);
}
@@ -194,7 +182,7 @@ public class EncodeContent extends AbstractProcessor {
private static class EncodeBase32 implements StreamCallback {
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base32OutputStream bos = new Base32OutputStream(out)) {
StreamUtils.copy(in, bos);
}
@@ -204,19 +192,19 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeBase32 implements StreamCallback {
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) {
StreamUtils.copy(bis, out);
}
}
}
- private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
-
private static class EncodeHex implements StreamCallback {
+ private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
int len;
byte[] inBuf = new byte[8192];
byte[] outBuf = new byte[inBuf.length * 2];
@@ -234,9 +222,9 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeHex implements StreamCallback {
@Override
- public void process(InputStream in, OutputStream out) throws IOException {
+ public void process(final InputStream in, final OutputStream out) throws IOException {
int len;
- byte[] inBuf = new byte[8192];
+ byte[] inBuf = new byte[BUFFER_SIZE];
Hex h = new Hex();
while ((len = in.read(inBuf)) > 0) {
// If the input buffer is of odd length, try to get another byte
@@ -252,8 +240,8 @@ public class EncodeContent extends AbstractProcessor {
byte[] slice = Arrays.copyOfRange(inBuf, 0, len);
try {
out.write(h.decode(slice));
- } catch (DecoderException ex) {
- throw new IOException(ex);
+ } catch (final DecoderException e) {
+ throw new IOException("Hexadecimal decoding failed", e);
}
}
out.flush();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e0db844bb5..0ad3ce3fe8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -29,6 +29,7 @@ org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DeduplicateRecord
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
+org.apache.nifi.processors.standard.EncodeContent
org.apache.nifi.processors.standard.EncryptContent
org.apache.nifi.processors.standard.EnforceOrder
org.apache.nifi.processors.standard.EvaluateJsonPath
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
index a0fcb28588..b8da70c754 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
@@ -16,8 +16,8 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.nifi.util.MockFlowFile;
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test;
public class TestEncodeContent {
+ private static final Path FILE_PATH = Paths.get("src/test/resources/hello.txt");
+
@Test
public void testBase64RoundTrip() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
@@ -34,7 +36,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@@ -50,7 +52,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+ flowFile.assertContentEquals(FILE_PATH);
}
@Test
@@ -60,7 +62,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@@ -68,7 +70,7 @@ public class TestEncodeContent {
}
@Test
- public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException {
+ public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() {
final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
@@ -88,7 +90,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@@ -104,7 +106,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+ flowFile.assertContentEquals(FILE_PATH);
}
@Test
@@ -114,7 +116,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@@ -128,7 +130,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@@ -144,7 +146,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+ flowFile.assertContentEquals(FILE_PATH);
}
@Test
@@ -154,7 +156,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
- testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+ testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();