You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/08 15:09:35 UTC
[camel] branch master updated: CAMEL-14162: camel-stream - When
using http url then data is not sent over the wire
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ce087f6 CAMEL-14162: camel-stream - When using http url then data is not sent over the wire
ce087f6 is described below
commit ce087f649fb10a0a3192577909dfe0cf9bd8a5a1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Nov 8 16:06:02 2019 +0100
CAMEL-14162: camel-stream - When using http url then data is not sent over the wire
---
components/camel-stream/pom.xml | 5 ++
.../camel/component/stream/StreamProducer.java | 26 ++++++++---
.../component/stream/StreamToUrlJettyTest.java | 53 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/components/camel-stream/pom.xml b/components/camel-stream/pom.xml
index b1a967a..1a4ecf0 100644
--- a/components/camel-stream/pom.xml
+++ b/components/camel-stream/pom.xml
@@ -69,6 +69,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index 6637b32..2693eb1 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -20,6 +20,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
@@ -47,6 +48,7 @@ public class StreamProducer extends DefaultProducer {
private StreamEndpoint endpoint;
private String uri;
private OutputStream outputStream;
+ private URLConnection urlConnection;
private AtomicInteger count = new AtomicInteger();
public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception {
@@ -86,18 +88,18 @@ public class StreamProducer extends DefaultProducer {
log.debug("About to write to url: {}", u);
URL url = new URL(u);
- URLConnection c = url.openConnection();
- c.setDoOutput(true);
+ urlConnection = url.openConnection();
+ urlConnection.setDoOutput(true);
if (endpoint.getConnectTimeout() > 0) {
- c.setConnectTimeout(endpoint.getConnectTimeout());
+ urlConnection.setConnectTimeout(endpoint.getConnectTimeout());
}
if (endpoint.getReadTimeout() > 0) {
- c.setReadTimeout(endpoint.getReadTimeout());
+ urlConnection.setReadTimeout(endpoint.getReadTimeout());
}
if (endpoint.getHttpHeaders() != null) {
- endpoint.getHttpHeaders().forEach((k, v) -> c.addRequestProperty(k, v.toString()));
+ endpoint.getHttpHeaders().forEach((k, v) -> urlConnection.addRequestProperty(k, v.toString()));
}
- return c.getOutputStream();
+ return urlConnection.getOutputStream();
}
private OutputStream resolveStreamFromFile() throws IOException {
@@ -203,8 +205,20 @@ public class StreamProducer extends DefaultProducer {
// never ever close a system stream
if (!systemStream && expiredStream) {
+
+ if (urlConnection != null) {
+ // force a flush as it may first send data over the wire when we are done
+ try {
+ InputStream is = urlConnection.getInputStream();
+ IOHelper.close(is);
+ } catch (Throwable e) {
+ // ignore
+ }
+ }
+
outputStream.close();
outputStream = null;
+ urlConnection = null;
log.debug("Closed stream '{}'", endpoint.getEndpointKey());
}
}
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java
new file mode 100644
index 0000000..834d54d
--- /dev/null
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Unit test for producer writing to URL.
+ */
+public class StreamToUrlJettyTest extends CamelTestSupport {
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ // just send one message at a time
+ .to("stream:url?autoCloseCount=1&url=http://localhost:8080/foo&httpHeaders.content-type=text/plain");
+
+ from("jetty:http://localhost:8080/foo")
+ .log("Jetty foo")
+ .to("mock:foo");
+ }
+ };
+ }
+
+ @Test
+ public void shouldSendToUrlOutputStream() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello" + System.lineSeparator(), "World" + System.lineSeparator());
+
+ template.sendBody("direct:start", "Hello");
+ template.sendBody("direct:start", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}