You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:51:19 UTC
[28/50] [abbrv] camel git commit: CAMEL-9966: Restlet - Should not
enable stream by default.
CAMEL-9966: Restlet - Should not enable stream by default.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7a0c3f8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7a0c3f8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7a0c3f8
Branch: refs/heads/kube-lb
Commit: c7a0c3f8373139b171e9b7fac41c32412e8bf6bb
Parents: 81f23c1
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 16:50:20 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200
----------------------------------------------------------------------
.../restlet/DefaultRestletBinding.java | 27 ++++++++++-
.../component/restlet/RestletEndpoint.java | 51 ++++++++++++++++++--
.../component/restlet/RestletOnCompletion.java | 38 +++++++++++++++
.../RestletProducerBinaryStreamTest.java | 22 ++++++++-
4 files changed, 129 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java
index b95ef7a..3ab9d1b 100644
--- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java
+++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java
@@ -76,6 +76,8 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate
private static final Logger LOG = LoggerFactory.getLogger(DefaultRestletBinding.class);
private static final String RFC_2822_DATE_PATTERN = "EEE, dd MMM yyyy HH:mm:ss Z";
private HeaderFilterStrategy headerFilterStrategy;
+ private boolean streamRepresentation;
+ private boolean autoCloseStream;
public void populateExchangeFromRestletRequest(Request request, Response response, Exchange exchange) throws Exception {
Message inMessage = exchange.getIn();
@@ -363,9 +365,14 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate
LOG.debug("Setting the Content-Type to be {}", mediaType.toString());
exchange.getOut().setHeader(Exchange.CONTENT_TYPE, mediaType.toString());
}
- if (response.getEntity() instanceof StreamRepresentation) {
+ if (streamRepresentation && response.getEntity() instanceof StreamRepresentation) {
Representation representationDecoded = new DecodeRepresentation(response.getEntity());
- exchange.getOut().setBody(representationDecoded.getStream());
+ InputStream is = representationDecoded.getStream();
+ exchange.getOut().setBody(is);
+ if (autoCloseStream) {
+ // ensure the input stream is closed when we are done routing
+ exchange.addOnCompletion(new RestletOnCompletion(is));
+ }
} else if (response.getEntity() instanceof Representation) {
Representation representationDecoded = new DecodeRepresentation(response.getEntity());
exchange.getOut().setBody(representationDecoded.getText());
@@ -584,4 +591,20 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate
public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
headerFilterStrategy = strategy;
}
+
+ public boolean isStreamRepresentation() {
+ return streamRepresentation;
+ }
+
+ public void setStreamRepresentation(boolean streamRepresentation) {
+ this.streamRepresentation = streamRepresentation;
+ }
+
+ public boolean isAutoCloseStream() {
+ return autoCloseStream;
+ }
+
+ public void setAutoCloseStream(boolean autoCloseStream) {
+ this.autoCloseStream = autoCloseStream;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
index 63ae6fa..ec2871b 100644
--- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
+++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.restlet;
+import java.io.InputStream;
import java.util.List;
import java.util.Map;
@@ -23,6 +24,7 @@ import org.apache.camel.AsyncEndpoint;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
@@ -66,18 +68,22 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H
private Method[] restletMethods;
@UriParam(label = "consumer")
private List<String> restletUriPatterns;
- @UriParam
+ @UriParam(label = "security")
private Map<String, String> restletRealm;
- @UriParam
+ @UriParam(label = "advanced")
private HeaderFilterStrategy headerFilterStrategy;
- @UriParam
+ @UriParam(label = "advanced")
private RestletBinding restletBinding;
@UriParam(label = "producer", defaultValue = "true")
private boolean throwExceptionOnFailure = true;
- @UriParam
+ @UriParam(label = "advanced")
private boolean disableStreamCache;
- @UriParam
+ @UriParam(label = "security")
private SSLContextParameters sslContextParameters;
+ @UriParam(label = "producer,advanced")
+ private boolean streamRepresentation;
+ @UriParam(label = "producer,advanced")
+ private boolean autoCloseStream;
public RestletEndpoint(RestletComponent component, String remaining) throws Exception {
super(remaining, component);
@@ -306,6 +312,37 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H
this.sslContextParameters = scp;
}
+ public boolean isStreamRepresentation() {
+ return streamRepresentation;
+ }
+
+ /**
+ * Whether to support stream representation as response from calling a REST service using the restlet producer.
+ * If the response is streaming then this option can be enabled to use an {@link java.io.InputStream} as the
+ * message body on the Camel {@link Message} body. If using this option you may want to enable the
+ * autoCloseStream option as well to ensure the input stream is closed when the Camel {@link Exchange}
+ * is done being routed. However if you need to read the stream outside a Camel route, you may need
+ * to not auto close the stream.
+ */
+ public void setStreamRepresentation(boolean streamRepresentation) {
+ this.streamRepresentation = streamRepresentation;
+ }
+
+ public boolean isAutoCloseStream() {
+ return autoCloseStream;
+ }
+
+ /**
+ * Whether to auto close the stream representation as response from calling a REST service using the restlet producer.
+ * If the response is streaming and the option streamRepresentation is enabled then you may want to auto close
+ * the {@link InputStream} from the streaming response to ensure the input stream is closed when the Camel {@link Exchange}
+ * is done being routed. However if you need to read the stream outside a Camel route, you may need
+ * to not auto close the stream.
+ */
+ public void setAutoCloseStream(boolean autoCloseStream) {
+ this.autoCloseStream = autoCloseStream;
+ }
+
// Update the endpointUri with the restlet method information
protected void updateEndpointUri() {
String endpointUri = getEndpointUri();
@@ -336,6 +373,10 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H
if (restletBinding instanceof HeaderFilterStrategyAware) {
((HeaderFilterStrategyAware) restletBinding).setHeaderFilterStrategy(getHeaderFilterStrategy());
}
+ if (restletBinding instanceof DefaultRestletBinding) {
+ ((DefaultRestletBinding) restletBinding).setStreamRepresentation(isStreamRepresentation());
+ ((DefaultRestletBinding) restletBinding).setAutoCloseStream(isAutoCloseStream());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java
new file mode 100644
index 0000000..5c2f684
--- /dev/null
+++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.restlet;
+
+import java.io.InputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+
+public class RestletOnCompletion extends SynchronizationAdapter {
+
+ private final InputStream inputStream;
+
+ public RestletOnCompletion(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public void onDone(Exchange exchange) {
+ IOHelper.close(inputStream);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java
----------------------------------------------------------------------
diff --git a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java
index 4cb24e7..bc15e17 100644
--- a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java
+++ b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.restlet;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
@@ -34,7 +36,7 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport {
@Test
public void shouldHandleBinaryOctetStream() throws Exception {
- Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream", null);
+ Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true", null);
assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream"));
assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes()));
@@ -42,12 +44,28 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport {
@Test
public void shouldHandleBinaryAudioMpeg() throws Exception {
- Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg", null);
+ Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg?streamRepresentation=true", null);
assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("audio/mpeg"));
assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes()));
}
+ @Test
+ public void shouldAutoClose() throws Exception {
+ Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true&autoCloseStream=true", null);
+
+ assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream"));
+ InputStream is = (InputStream) response.getOut().getBody();
+ assertNotNull(is);
+
+ try {
+ is.read();
+ fail("Should be closed");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {