You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2023/01/25 22:10:54 UTC

[nifi] branch main updated: NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5101db2ab7 NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition
5101db2ab7 is described below

commit 5101db2ab736bab4f08acce90f7880f0cb06de78
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Jan 25 15:43:39 2023 -0600

    NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition
    
    - Deprecated Base64EncodeContent in favor of EncodeContent
    - Updated EncodeContent to use Commons Codec for Hexadecimal encoding
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../processors/standard/Base64EncodeContent.java   |   5 ++
 .../nifi/processors/standard/EncodeContent.java    | 100 +++++++++------------
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processors/standard/TestEncodeContent.java     |  24 ++---
 4 files changed, 63 insertions(+), 67 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index cdade756d7..74f06e1dce 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -54,6 +55,10 @@ import org.apache.nifi.util.StopWatch;
 @Tags({"encode", "base64"})
 @CapabilityDescription("Encodes or decodes content to and from base64")
 @InputRequirement(Requirement.INPUT_REQUIRED)
+@DeprecationNotice(
+        alternatives = EncodeContent.class,
+        reason = "EncodeContent supports Base64 and additional encoding schemes"
+)
 public class Base64EncodeContent extends AbstractProcessor {
 
     public static final String ENCODE_MODE = "Encode";
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
index 469e4ecd92..869605b7d2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
@@ -19,10 +19,9 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -33,7 +32,6 @@ import org.apache.commons.codec.binary.Base32OutputStream;
 import org.apache.commons.codec.binary.Base64InputStream;
 import org.apache.commons.codec.binary.Base64OutputStream;
 import org.apache.commons.codec.binary.Hex;
-import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -42,11 +40,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream;
@@ -54,18 +50,16 @@ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
-@EventDriven
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"encode", "decode", "base64", "hex"})
-@CapabilityDescription("Encodes the FlowFile content in base64")
+@CapabilityDescription("Encode or decode contents using configurable encoding schemes")
 public class EncodeContent extends AbstractProcessor {
 
     public static final String ENCODE_MODE = "Encode";
     public static final String DECODE_MODE = "Decode";
 
-    // List of support encodings.
     public static final String BASE64_ENCODING = "base64";
     public static final String BASE32_ENCODING = "base32";
     public static final String HEX_ENCODING = "hex";
@@ -95,21 +89,23 @@ public class EncodeContent extends AbstractProcessor {
             .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
             .build();
 
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
+    private static final int BUFFER_SIZE = 8192;
 
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(MODE);
-        props.add(ENCODING);
-        this.properties = Collections.unmodifiableList(props);
-
-        final Set<Relationship> rels = new HashSet<>();
-        rels.add(REL_SUCCESS);
-        rels.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(rels);
-    }
+    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(
+                    MODE,
+                    ENCODING
+            )
+    );
+
+    private static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new LinkedHashSet<>(
+                Arrays.asList(
+                        REL_SUCCESS,
+                        REL_FAILURE
+                )
+            )
+    );
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -128,45 +124,37 @@ public class EncodeContent extends AbstractProcessor {
             return;
         }
 
-        final ComponentLog logger = getLogger();
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String encoding = context.getProperty(ENCODING).getValue();
+        final StreamCallback callback;
 
-        boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
-        String encoding = context.getProperty(ENCODING).getValue();
-        StreamCallback encoder = null;
-
-        // Select the encoder/decoder to use
         if (encode) {
             if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
-                encoder = new EncodeBase64();
+                callback = new EncodeBase64();
             } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
-                encoder = new EncodeBase32();
-            } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
-                encoder = new EncodeHex();
+                callback = new EncodeBase32();
+            } else {
+                callback = new EncodeHex();
             }
         } else {
             if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
-                encoder = new DecodeBase64();
+                callback = new DecodeBase64();
             } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
-                encoder = new DecodeBase32();
-            } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
-                encoder = new DecodeHex();
+                callback = new DecodeBase32();
+            } else {
+                callback = new DecodeHex();
             }
         }
 
-        if (encoder == null) {
-            logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode", encoding});
-            return;
-        }
-
         try {
             final StopWatch stopWatch = new StopWatch(true);
-            flowFile = session.write(flowFile, encoder);
+            flowFile = session.write(flowFile, callback);
 
-            logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
+            getLogger().info("{} completed {}", encode ? "Encoding" : "Decoding", flowFile);
             session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (Exception e) {
-            logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
+        } catch (final Exception e) {
+            getLogger().error("{} failed {}", encode ? "Encoding" : "Decoding", flowFile, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
@@ -174,7 +162,7 @@ public class EncodeContent extends AbstractProcessor {
     private static class EncodeBase64 implements StreamCallback {
 
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             try (Base64OutputStream bos = new Base64OutputStream(out)) {
                 StreamUtils.copy(in, bos);
             }
@@ -184,7 +172,7 @@ public class EncodeContent extends AbstractProcessor {
     private static class DecodeBase64 implements StreamCallback {
 
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
                 StreamUtils.copy(bis, out);
             }
@@ -194,7 +182,7 @@ public class EncodeContent extends AbstractProcessor {
     private static class EncodeBase32 implements StreamCallback {
 
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             try (Base32OutputStream bos = new Base32OutputStream(out)) {
                 StreamUtils.copy(in, bos);
             }
@@ -204,19 +192,19 @@ public class EncodeContent extends AbstractProcessor {
     private static class DecodeBase32 implements StreamCallback {
 
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) {
                 StreamUtils.copy(bis, out);
             }
         }
     }
 
-    private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
-
     private static class EncodeHex implements StreamCallback {
 
+        private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             int len;
             byte[] inBuf = new byte[8192];
             byte[] outBuf = new byte[inBuf.length * 2];
@@ -234,9 +222,9 @@ public class EncodeContent extends AbstractProcessor {
     private static class DecodeHex implements StreamCallback {
 
         @Override
-        public void process(InputStream in, OutputStream out) throws IOException {
+        public void process(final InputStream in, final OutputStream out) throws IOException {
             int len;
-            byte[] inBuf = new byte[8192];
+            byte[] inBuf = new byte[BUFFER_SIZE];
             Hex h = new Hex();
             while ((len = in.read(inBuf)) > 0) {
                 // If the input buffer is of odd length, try to get another byte
@@ -252,8 +240,8 @@ public class EncodeContent extends AbstractProcessor {
                 byte[] slice = Arrays.copyOfRange(inBuf, 0, len);
                 try {
                     out.write(h.decode(slice));
-                } catch (DecoderException ex) {
-                    throw new IOException(ex);
+                } catch (final DecoderException e) {
+                    throw new IOException("Hexadecimal decoding failed", e);
                 }
             }
             out.flush();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e0db844bb5..0ad3ce3fe8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -29,6 +29,7 @@ org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DeduplicateRecord
 org.apache.nifi.processors.standard.DistributeLoad
 org.apache.nifi.processors.standard.DuplicateFlowFile
+org.apache.nifi.processors.standard.EncodeContent
 org.apache.nifi.processors.standard.EncryptContent
 org.apache.nifi.processors.standard.EnforceOrder
 org.apache.nifi.processors.standard.EvaluateJsonPath
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
index a0fcb28588..b8da70c754 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
@@ -16,8 +16,8 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import org.apache.nifi.util.MockFlowFile;
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test;
 
 public class TestEncodeContent {
 
+    private static final Path FILE_PATH = Paths.get("src/test/resources/hello.txt");
+
     @Test
     public void testBase64RoundTrip() throws IOException {
         final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
@@ -34,7 +36,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();
 
@@ -50,7 +52,7 @@ public class TestEncodeContent {
         testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
 
         flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
-        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+        flowFile.assertContentEquals(FILE_PATH);
     }
 
     @Test
@@ -60,7 +62,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();
 
@@ -68,7 +70,7 @@ public class TestEncodeContent {
     }
 
     @Test
-    public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException {
+    public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() {
         final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
 
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
@@ -88,7 +90,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();
 
@@ -104,7 +106,7 @@ public class TestEncodeContent {
         testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
 
         flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
-        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+        flowFile.assertContentEquals(FILE_PATH);
     }
 
     @Test
@@ -114,7 +116,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();
 
@@ -128,7 +130,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();
 
@@ -144,7 +146,7 @@ public class TestEncodeContent {
         testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
 
         flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
-        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+        flowFile.assertContentEquals(FILE_PATH);
     }
 
     @Test
@@ -154,7 +156,7 @@ public class TestEncodeContent {
         testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
         testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
 
-        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.enqueue(FILE_PATH);
         testRunner.clearTransferState();
         testRunner.run();