You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/05/12 13:51:44 UTC

svn commit: r655481 - in /activemq/camel/trunk/components/camel-stream/src: main/java/org/apache/camel/component/stream/ test/java/org/apache/camel/component/stream/

Author: davsclaus
Date: Mon May 12 04:51:44 2008
New Revision: 655481

URL: http://svn.apache.org/viewvc?rev=655481&view=rev
Log:
CAMEL-433: using the new getAndRemoveParameter programming best practice

Removed:
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
Modified:
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
    activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java Mon May 12 04:51:44 2008
@@ -18,17 +18,18 @@
 
 import java.util.Map;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultComponent;
 
 /**
  * Component providing streams connectivity
  */
-public class StreamComponent extends DefaultComponent<StreamExchange> {
+public class StreamComponent extends DefaultComponent<Exchange> {
 
     @Override
-    protected Endpoint<StreamExchange> createEndpoint(String uri, String remaining, Map parameters)
+    protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters)
         throws Exception {
-        return new StreamEndpoint(this, uri, remaining, parameters);
+        return new StreamEndpoint(uri, this);
     }
 
 }

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Mon May 12 04:51:44 2008
@@ -26,9 +26,7 @@
 import java.net.URLConnection;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -38,30 +36,20 @@
 /**
  * Consumer that can read from any stream
  */
-public class StreamConsumer extends DefaultConsumer<StreamExchange> {
+public class StreamConsumer extends DefaultConsumer<Exchange> {
 
     private static final transient Log LOG = LogFactory.getLog(StreamConsumer.class);
     private static final String TYPES = "in";
     private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
-    protected InputStream inputStream = System.in;
-    protected Endpoint<StreamExchange> endpoint;
+    private InputStream inputStream = System.in;
+    private StreamEndpoint endpoint;
     private String uri;
-    private String file;
-    private String url;
 
-    public StreamConsumer(Endpoint<StreamExchange> endpoint, Processor processor, String uri,
-                          Map<String, String> parameters) throws Exception {
+    public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.uri = uri;
-
-        file = parameters.get("file");
-        url = parameters.get("url");
-        // must remove the options this component supports
-        parameters.remove("file");
-        parameters.remove("url");
-
         validateUri(uri);
     }
 
@@ -83,10 +71,6 @@
             while ((line = br.readLine()) != null) {
                 consume(line);
             }
-        } catch (IOException e) {
-            throw new StreamComponentException(e);
-        } catch (Exception e) {
-            throw new StreamComponentException(e);
         } finally {
             br.close();
         }
@@ -107,14 +91,14 @@
     }
 
     private InputStream resolveStreamFromUrl() throws IOException {
-        String u = url;
+        String u = endpoint.getUrl();
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getInputStream();
     }
 
     private InputStream resolveStreamFromFile() throws IOException {
-        String fileName = file != null ? file.trim() : "_file";
+        String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file";
         File f = new File(fileName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("About to read from file: " + f);
@@ -123,7 +107,7 @@
         return new FileInputStream(f);
     }
 
-    private void validateUri(String uri) throws Exception {
+    private void validateUri(String uri) throws IllegalArgumentException {
         String[] s = uri.split(":");
         if (s.length < 2) {
             throw new IllegalArgumentException(INVALID_URI);

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Mon May 12 04:51:44 2008
@@ -16,36 +16,60 @@
  */
 package org.apache.camel.component.stream;
 
-import java.util.Map;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.Component;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultEndpoint;
 
-public class StreamEndpoint extends DefaultEndpoint<StreamExchange> {
-    Producer<StreamExchange> producer;
-    private Map<String, String> parameters;
+public class StreamEndpoint extends DefaultEndpoint<Exchange> {
     private String uri;
-
-
-    public StreamEndpoint(StreamComponent component, String uri, String remaining,
-                          Map<String, String> parameters) throws Exception {
-        super(uri, component);
-        this.parameters = parameters;
-        this.uri = uri;
-        this.producer = new StreamProducer(this, uri, parameters);
+    private String file;
+    private String url;
+    private long delay;
+
+    public StreamEndpoint(String endpointUri, Component component) throws Exception {
+        super(endpointUri, component);
+        this.uri = endpointUri;
     }
 
-    public Consumer<StreamExchange> createConsumer(Processor processor) throws Exception {
-        return new StreamConsumer(this, processor, uri, parameters);
+    public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
+        return new StreamConsumer(this, processor, uri);
     }
 
-    public Producer<StreamExchange> createProducer() throws Exception {
-        return producer;
+    public Producer<Exchange> createProducer() throws Exception {
+        return new StreamProducer(this, uri);
     }
 
     public boolean isSingleton() {
         return true;
     }
+
+    // Properties
+    //-------------------------------------------------------------------------
+
+    public String getFile() {
+        return file;
+    }
+
+    public void setFile(String file) {
+        this.file = file;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
 }

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java Mon May 12 04:51:44 2008
@@ -26,11 +26,6 @@
     }
 
     @Override
-    public String toString() {
-        return o.toString();
-    }
-
-    @Override
     protected Object createBody() {
         return o;
     }
@@ -40,4 +35,9 @@
         return o;
     }
 
+    @Override
+    public String toString() {
+        return o.toString();
+    }
+
 }

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Mon May 12 04:51:44 2008
@@ -26,41 +26,27 @@
 import java.net.URLConnection;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.converter.ObjectConverter;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class StreamProducer extends DefaultProducer<StreamExchange> {
+public class StreamProducer extends DefaultProducer<Exchange> {
 
     private static final transient Log LOG = LogFactory.getLog(StreamProducer.class);
     private static final String TYPES = "in,out,err,file,url,header";
     private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
-    protected OutputStream outputStream = System.out;
+    private OutputStream outputStream = System.out;
+    private StreamEndpoint endpoint;
     private String uri;
-    private Map<String, String> parameters;
-    private String delay;
-    private String url;
-    private String file;
 
-    public StreamProducer(Endpoint<StreamExchange> endpoint, String uri, Map<String, String> parameters)
+    public StreamProducer(StreamEndpoint endpoint, String uri)
         throws Exception {
         super(endpoint);
-        this.parameters = parameters;
-
-        delay = parameters.get("delay");
-        url = parameters.get("url");
-        file = parameters.get("file");
-        // must remove the parameters this component support
-        parameters.remove("delay");
-        parameters.remove("url");
-        parameters.remove("file");
-
+        this.endpoint = endpoint;
         validateUri(uri);
     }
 
@@ -73,10 +59,7 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (delay != null) {
-            long ms = ObjectConverter.toLong(delay);
-            delay(ms);
-        }
+        delay(endpoint.getDelay());
 
         if ("out".equals(uri)) {
             outputStream = System.out;
@@ -85,7 +68,7 @@
         } else if ("file".equals(uri)) {
             outputStream = resolveStreamFromFile();
         } else if ("header".equals(uri)) {
-            outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"));
+            outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
         } else if ("url".equals(uri)) {
             outputStream = resolveStreamFromUrl();
         }
@@ -93,14 +76,14 @@
     }
 
     private OutputStream resolveStreamFromUrl() throws IOException {
-        String u = url;
+        String u = endpoint.getUrl();
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getOutputStream();
     }
 
     private OutputStream resolveStreamFromFile() throws IOException {
-        String fileName = file != null ? file.trim() : "_file";
+        String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file";
         File f = new File(fileName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("About to write to file: " + f);
@@ -109,15 +92,18 @@
         return new FileOutputStream(f);
     }
 
-    private OutputStream resolveStreamFromHeader(Object o) throws StreamComponentException {
+    private OutputStream resolveStreamFromHeader(Object o, Exchange exchange) throws CamelExchangeException {
         if (o != null && o instanceof OutputStream) {
             return (OutputStream)o;
         } else {
-            throw new StreamComponentException("Expected OutputStream in header('stream'), found: " + o);
+            throw new CamelExchangeException("Expected OutputStream in header('stream'), found: " + o, exchange);
         }
     }
 
     private void delay(long ms) throws InterruptedException {
+        if (ms == 0) {
+            return;
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Delaying " + ms + " millis");
         }

Modified: activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java?rev=655481&r1=655480&r2=655481&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java Mon May 12 04:51:44 2008
@@ -16,21 +16,17 @@
  */
 package org.apache.camel.component.stream;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 
 public class StreamRouteBuilderTest extends ContextTestSupport {
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        context = createCamelContext();
+
+    public void testStringContent() {
+        template.sendBody("direct:start", "<content/>");
     }
 
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext camelContext = super.createCamelContext();
-        camelContext.addComponent("stream", new StreamComponent());
-        return camelContext;
+    public void testBinaryContent() {
+        template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
     }
 
     protected RouteBuilder createRouteBuilder() {
@@ -38,17 +34,8 @@
             public void configure() {
                 from("direct:start").setHeader("stream", constant(System.out))
                     .to("stream:err", "stream:out", "stream:file?file=/tmp/foo", "stream:header");
-                // from("stream:in").to("stream:out",
-                // "stream:err?delay=5000");
             }
         };
     }
-
-    public void testStringContent() {
-        template.sendBody("direct:start", "<content/>");
-    }
-
-    public void testBinaryContent() {
-        template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
-    }
+    
 }