You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/11/02 14:18:09 UTC

[camel] branch master updated: CAMEL-12835: Fixed camel-json-validator to deal with streaming content not being re-readable and therefore favour using stream caching.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c46180  CAMEL-12835: Fixed camel-json-validator to deal with streaming content not being re-readable and therefore favour using stream caching.
0c46180 is described below

commit 0c46180bc3fef6fb6daf9681a9b388ab74c86e30
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Nov 2 15:16:27 2018 +0100

    CAMEL-12835: Fixed camel-json-validator to deal with streaming content not being re-readable and therefore favour using stream caching.
---
 .../jsonvalidator/JsonValidatorEndpoint.java       | 42 +++++++++++++----
 .../jsonvalidator/ValidatorInputStreamTest.java    | 53 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 10 deletions(-)

diff --git a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
index fb07874..9841e6c 100644
--- a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
+++ b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java
@@ -26,12 +26,12 @@ import com.networknt.schema.ValidationMessage;
 import org.apache.camel.Component;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.StreamCache;
 import org.apache.camel.ValidationException;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.ResourceEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +75,20 @@ public class JsonValidatorEndpoint extends ResourceEndpoint {
     
     @Override
     protected void onExchange(Exchange exchange) throws Exception {
-        InputStream is = null;
+        StreamCache cache = null;
+
+        // if the content is an input stream then its likely not re-readable so we need to make it stream cached
+        Object content = getContentToValidate(exchange);
+        if (!(content instanceof StreamCache) && content instanceof InputStream) {
+            cache = exchange.getContext().getTypeConverter().convertTo(StreamCache.class, exchange, content);
+            if (cache != null) {
+                if (shouldUseHeader()) {
+                    exchange.getIn().setHeader(headerName, cache);
+                } else {
+                    exchange.getIn().setBody(cache);
+                }
+            }
+        }
 
         // Get a local copy of the current schema to improve concurrency.
         JsonSchema localSchema = this.schema;
@@ -83,19 +96,26 @@ public class JsonValidatorEndpoint extends ResourceEndpoint {
             localSchema = getOrCreateSchema();
         }
         try {
-            is = getContentToValidate(exchange, InputStream.class);
             if (shouldUseHeader()) {
-                if (is == null && isFailOnNullHeader()) {
+                if (content == null && isFailOnNullHeader()) {
                     throw new NoJsonHeaderValidationException(exchange, headerName);
                 }
             } else {
-                if (is == null && isFailOnNullBody()) {
+                if (content == null && isFailOnNullBody()) {
                     throw new NoJsonBodyValidationException(exchange);
                 }
             }
-            if (is != null) {
+            if (content != null) {
+                // favour using stream caching
+                if (cache == null) {
+                    cache = exchange.getContext().getTypeConverter().convertTo(StreamCache.class, exchange, content);
+                }
                 ObjectMapper mapper = new ObjectMapper();
+                InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, cache != null ? cache : content);
                 JsonNode node = mapper.readTree(is);
+                if (node == null) {
+                    throw new NoJsonBodyValidationException(exchange);
+                }
                 Set<ValidationMessage> errors = localSchema.validate(node);
 
                 if (errors.size() > 0) {
@@ -114,15 +134,17 @@ public class JsonValidatorEndpoint extends ResourceEndpoint {
                 this.errorHandler.handleErrors(exchange, schema, e);
             }
         } finally {
-            IOHelper.close(is);
+            if (cache != null) {
+                cache.reset();
+            }
         }
     }
     
-    private <T> T getContentToValidate(Exchange exchange, Class<T> clazz) {
+    private Object getContentToValidate(Exchange exchange) {
         if (shouldUseHeader()) {
-            return exchange.getIn().getHeader(headerName, clazz);
+            return exchange.getIn().getHeader(headerName);
         } else {
-            return exchange.getIn().getBody(clazz);
+            return exchange.getIn().getBody();
         }
     }
 
diff --git a/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java b/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java
new file mode 100644
index 0000000..3bf1d5e
--- /dev/null
+++ b/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jsonvalidator;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ValidatorInputStreamTest extends CamelTestSupport {
+    
+    @Test
+    public void testReadTwice() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        String body = "{ \"name\": \"Joe Doe\", \"id\": 1, \"price\": 12.5 }";
+        ByteArrayInputStream bais = new ByteArrayInputStream(body.getBytes());
+
+        template.sendBody("direct:start", bais);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("json-validator:org/apache/camel/component/jsonvalidator/schema.json")
+                    .to("mock:foo")
+                    .to("json-validator:org/apache/camel/component/jsonvalidator/schema.json")
+                    .to("mock:bar");
+            }
+        };
+    }
+}