You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/05/12 13:51:44 UTC
svn commit: r655481 - in /activemq/camel/trunk/components/camel-stream/src:
main/java/org/apache/camel/component/stream/
test/java/org/apache/camel/component/stream/
Author: davsclaus
Date: Mon May 12 04:51:44 2008
New Revision: 655481
URL: http://svn.apache.org/viewvc?rev=655481&view=rev
Log:
CAMEL-433: using the new getAndRemoveParameter programming best practice
Removed:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java Mon May 12 04:51:44 2008
@@ -18,17 +18,18 @@
import java.util.Map;
import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultComponent;
/**
* Component providing streams connectivity
*/
-public class StreamComponent extends DefaultComponent<StreamExchange> {
+public class StreamComponent extends DefaultComponent<Exchange> {
@Override
- protected Endpoint<StreamExchange> createEndpoint(String uri, String remaining, Map parameters)
+ protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters)
throws Exception {
- return new StreamEndpoint(this, uri, remaining, parameters);
+ return new StreamEndpoint(uri, this);
}
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Mon May 12 04:51:44 2008
@@ -26,9 +26,7 @@
import java.net.URLConnection;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -38,30 +36,20 @@
/**
* Consumer that can read from any stream
*/
-public class StreamConsumer extends DefaultConsumer<StreamExchange> {
+public class StreamConsumer extends DefaultConsumer<Exchange> {
private static final transient Log LOG = LogFactory.getLog(StreamConsumer.class);
private static final String TYPES = "in";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
- protected InputStream inputStream = System.in;
- protected Endpoint<StreamExchange> endpoint;
+ private InputStream inputStream = System.in;
+ private StreamEndpoint endpoint;
private String uri;
- private String file;
- private String url;
- public StreamConsumer(Endpoint<StreamExchange> endpoint, Processor processor, String uri,
- Map<String, String> parameters) throws Exception {
+ public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception {
super(endpoint, processor);
this.endpoint = endpoint;
this.uri = uri;
-
- file = parameters.get("file");
- url = parameters.get("url");
- // must remove the options this component supports
- parameters.remove("file");
- parameters.remove("url");
-
validateUri(uri);
}
@@ -83,10 +71,6 @@
while ((line = br.readLine()) != null) {
consume(line);
}
- } catch (IOException e) {
- throw new StreamComponentException(e);
- } catch (Exception e) {
- throw new StreamComponentException(e);
} finally {
br.close();
}
@@ -107,14 +91,14 @@
}
private InputStream resolveStreamFromUrl() throws IOException {
- String u = url;
+ String u = endpoint.getUrl();
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getInputStream();
}
private InputStream resolveStreamFromFile() throws IOException {
- String fileName = file != null ? file.trim() : "_file";
+ String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file";
File f = new File(fileName);
if (LOG.isDebugEnabled()) {
LOG.debug("About to read from file: " + f);
@@ -123,7 +107,7 @@
return new FileInputStream(f);
}
- private void validateUri(String uri) throws Exception {
+ private void validateUri(String uri) throws IllegalArgumentException {
String[] s = uri.split(":");
if (s.length < 2) {
throw new IllegalArgumentException(INVALID_URI);
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Mon May 12 04:51:44 2008
@@ -16,36 +16,60 @@
*/
package org.apache.camel.component.stream;
-import java.util.Map;
-
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.Component;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultEndpoint;
-public class StreamEndpoint extends DefaultEndpoint<StreamExchange> {
- Producer<StreamExchange> producer;
- private Map<String, String> parameters;
+public class StreamEndpoint extends DefaultEndpoint<Exchange> {
private String uri;
-
-
- public StreamEndpoint(StreamComponent component, String uri, String remaining,
- Map<String, String> parameters) throws Exception {
- super(uri, component);
- this.parameters = parameters;
- this.uri = uri;
- this.producer = new StreamProducer(this, uri, parameters);
+ private String file;
+ private String url;
+ private long delay;
+
+ public StreamEndpoint(String endpointUri, Component component) throws Exception {
+ super(endpointUri, component);
+ this.uri = endpointUri;
}
- public Consumer<StreamExchange> createConsumer(Processor processor) throws Exception {
- return new StreamConsumer(this, processor, uri, parameters);
+ public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
+ return new StreamConsumer(this, processor, uri);
}
- public Producer<StreamExchange> createProducer() throws Exception {
- return producer;
+ public Producer<Exchange> createProducer() throws Exception {
+ return new StreamProducer(this, uri);
}
public boolean isSingleton() {
return true;
}
+
+ // Properties
+ //-------------------------------------------------------------------------
+
+ public String getFile() {
+ return file;
+ }
+
+ public void setFile(String file) {
+ this.file = file;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java Mon May 12 04:51:44 2008
@@ -26,11 +26,6 @@
}
@Override
- public String toString() {
- return o.toString();
- }
-
- @Override
protected Object createBody() {
return o;
}
@@ -40,4 +35,9 @@
return o;
}
+ @Override
+ public String toString() {
+ return o.toString();
+ }
+
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Mon May 12 04:51:44 2008
@@ -26,41 +26,27 @@
import java.net.URLConnection;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.converter.ObjectConverter;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class StreamProducer extends DefaultProducer<StreamExchange> {
+public class StreamProducer extends DefaultProducer<Exchange> {
private static final transient Log LOG = LogFactory.getLog(StreamProducer.class);
private static final String TYPES = "in,out,err,file,url,header";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
- protected OutputStream outputStream = System.out;
+ private OutputStream outputStream = System.out;
+ private StreamEndpoint endpoint;
private String uri;
- private Map<String, String> parameters;
- private String delay;
- private String url;
- private String file;
- public StreamProducer(Endpoint<StreamExchange> endpoint, String uri, Map<String, String> parameters)
+ public StreamProducer(StreamEndpoint endpoint, String uri)
throws Exception {
super(endpoint);
- this.parameters = parameters;
-
- delay = parameters.get("delay");
- url = parameters.get("url");
- file = parameters.get("file");
- // must remove the parameters this component support
- parameters.remove("delay");
- parameters.remove("url");
- parameters.remove("file");
-
+ this.endpoint = endpoint;
validateUri(uri);
}
@@ -73,10 +59,7 @@
}
public void process(Exchange exchange) throws Exception {
- if (delay != null) {
- long ms = ObjectConverter.toLong(delay);
- delay(ms);
- }
+ delay(endpoint.getDelay());
if ("out".equals(uri)) {
outputStream = System.out;
@@ -85,7 +68,7 @@
} else if ("file".equals(uri)) {
outputStream = resolveStreamFromFile();
} else if ("header".equals(uri)) {
- outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"));
+ outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
} else if ("url".equals(uri)) {
outputStream = resolveStreamFromUrl();
}
@@ -93,14 +76,14 @@
}
private OutputStream resolveStreamFromUrl() throws IOException {
- String u = url;
+ String u = endpoint.getUrl();
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getOutputStream();
}
private OutputStream resolveStreamFromFile() throws IOException {
- String fileName = file != null ? file.trim() : "_file";
+ String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file";
File f = new File(fileName);
if (LOG.isDebugEnabled()) {
LOG.debug("About to write to file: " + f);
@@ -109,15 +92,18 @@
return new FileOutputStream(f);
}
- private OutputStream resolveStreamFromHeader(Object o) throws StreamComponentException {
+ private OutputStream resolveStreamFromHeader(Object o, Exchange exchange) throws CamelExchangeException {
if (o != null && o instanceof OutputStream) {
return (OutputStream)o;
} else {
- throw new StreamComponentException("Expected OutputStream in header('stream'), found: " + o);
+ throw new CamelExchangeException("Expected OutputStream in header('stream'), found: " + o, exchange);
}
}
private void delay(long ms) throws InterruptedException {
+ if (ms == 0) {
+ return;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Delaying " + ms + " millis");
}
Modified: activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java Mon May 12 04:51:44 2008
@@ -16,21 +16,17 @@
*/
package org.apache.camel.component.stream;
-import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
public class StreamRouteBuilderTest extends ContextTestSupport {
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- context = createCamelContext();
+
+ public void testStringContent() {
+ template.sendBody("direct:start", "<content/>");
}
- protected CamelContext createCamelContext() throws Exception {
- CamelContext camelContext = super.createCamelContext();
- camelContext.addComponent("stream", new StreamComponent());
- return camelContext;
+ public void testBinaryContent() {
+ template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
}
protected RouteBuilder createRouteBuilder() {
@@ -38,17 +34,8 @@
public void configure() {
from("direct:start").setHeader("stream", constant(System.out))
.to("stream:err", "stream:out", "stream:file?file=/tmp/foo", "stream:header");
- // from("stream:in").to("stream:out",
- // "stream:err?delay=5000");
}
};
}
-
- public void testStringContent() {
- template.sendBody("direct:start", "<content/>");
- }
-
- public void testBinaryContent() {
- template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
- }
+
}