You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/12 13:29:43 UTC

[camel] branch camel-2.25.x updated: CAMEL-16177: multicast parallel processing and encrypted stream cache (#5069)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 85cfc66  CAMEL-16177: multicast parallel processing and encrypted stream cache (#5069)
85cfc66 is described below

commit 85cfc66dff34fee38891d6d6495f74c6e1952f3a
Author: forsthofer <fo...@users.noreply.github.com>
AuthorDate: Fri Feb 12 14:29:23 2021 +0100

    CAMEL-16177: multicast parallel processing and encrypted stream cache (#5069)
    
    * CAMEL-16177: multicast parallel processing and encrypted stream cache
    
    * CAMEL-16177: correct error message
    
    * CAMEL-16177: correcting license header
    
    Co-authored-by: Franz Forsthofer <fr...@sap.com>
---
 .../apache/camel/converter/stream/CipherPair.java  |  24 +++--
 ...ParallelAndStreamCachingWithEncryptionTest.java | 113 +++++++++++++++++++++
 .../org/apache/camel/processor/payload10KB.txt     |   1 +
 3 files changed, 131 insertions(+), 7 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
index 159699b..7cbdacc 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CipherPair.java
@@ -30,11 +30,12 @@ import javax.crypto.spec.IvParameterSpec;
 public class CipherPair {
     private final String transformation;
     private final Cipher enccipher;
-    private final Cipher deccipher;
+    private final Key key;
+    private final byte[] ivp;
     
     public CipherPair(String transformation) throws GeneralSecurityException {
         this.transformation = transformation;
-        
+
         int d = transformation.indexOf('/');
         String a;
         if (d > 0) {
@@ -45,12 +46,10 @@ public class CipherPair {
 
         KeyGenerator keygen = KeyGenerator.getInstance(a);
         keygen.init(new SecureRandom());
-        Key key = keygen.generateKey();
+        key = keygen.generateKey();
         enccipher = Cipher.getInstance(transformation);
-        deccipher = Cipher.getInstance(transformation);
         enccipher.init(Cipher.ENCRYPT_MODE, key);
-        final byte[] ivp = enccipher.getIV();
-        deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
+        ivp = enccipher.getIV();
     }
     
     public String getTransformation() {
@@ -61,7 +60,18 @@ public class CipherPair {
         return enccipher;
     }
     
+    /**
+     * Create the decryptor every time because the decryptor is not thead safe. For example, if you reuse the decryptor
+     * instance in the Multi-cast case then you will get errors.
+     */
     public Cipher getDecryptor() {
-        return deccipher;
+        try {
+            Cipher deccipher = Cipher.getInstance(transformation);
+            deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp));
+            return deccipher;
+        } catch (GeneralSecurityException e) {
+            // should not happen
+            throw new IllegalStateException("Could not instanciate decryptor", e);
+        }
     }
 }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java
new file mode 100644
index 0000000..14aeabc
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.IOHelper;
+import org.junit.Test;
+
+/**
+ * Tests the processing of a file stream-cache with encryption by the multi-cast
+ * processor in the parallel processing mode.
+ */
+public class MultiCastParallelAndStreamCachingWithEncryptionTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(5000L);
+                context.getStreamCachingStrategy().setSpoolCipher("AES/CTR/NoPadding");
+
+                from("direct:start").multicast().parallelProcessing().stopOnException().to("direct:a", "direct:b").end()
+                        .to("mock:result");
+
+                from("direct:a") //
+                        // read stream
+                        .process(new SimpleProcessor()).to("mock:resulta");
+
+                from("direct:b") //
+                        // read stream concurrently, because of parallel processing
+                        .process(new SimpleProcessor()).to("mock:resultb");
+
+            }
+        };
+    }
+
+    private static class SimpleProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            Object body = exchange.getIn().getBody();
+            if (body instanceof InputStream) {
+                ByteArrayOutputStream output = new ByteArrayOutputStream();
+                IOHelper.copy((InputStream) body, output);
+                exchange.getMessage().setBody(output.toByteArray());
+            } else {
+                throw new RuntimeException("Type " + body.getClass().getName() + " not supported");
+            }
+
+        }
+    }
+
+    /**
+     * Tests the FileInputStreamCache. The sent input stream is transformed to
+     * FileInputStreamCache before the multi-cast processor is called.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testFileInputStreamCache() throws Exception {
+
+        InputStream resultA = getPayload();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOHelper.copy(resultA, baos);
+        IOHelper.close(resultA);
+        byte[] resultBytes = baos.toByteArray();
+
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived(resultBytes);
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived(resultBytes);
+
+        InputStream in = getPayload();
+        try {
+            template.sendBody("direct:start", in);
+            assertMockEndpointsSatisfied();
+        } finally {
+            in.close();
+        }
+    }
+
+    private InputStream getPayload() {
+        return MultiCastParallelAndStreamCachingWithEncryptionTest.class.getClassLoader()
+                .getResourceAsStream("org/apache/camel/processor/payload10KB.txt");
+    }
+
+}
diff --git a/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt b/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt
new file mode 100644
index 0000000..59fde05
--- /dev/null
+++ b/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt
@@ -0,0 +1 @@
+payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102pa [...]
\ No newline at end of file