You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/05/11 18:32:56 UTC

svn commit: r655332 - /activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/

Author: davsclaus
Date: Sun May 11 09:32:56 2008
New Revision: 655332

URL: http://svn.apache.org/viewvc?rev=655332&view=rev
Log:
CAMEL-433: better validation when creating an endpoint from uri

Removed:
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java
Modified:
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
    activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java Sun May 11 09:32:56 2008
@@ -19,16 +19,12 @@
 import java.util.Map;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
+/**
+ * Component providing streams connectivity
+ */
 public class StreamComponent extends DefaultComponent<StreamExchange> {
 
-    /**
-     * Component providing streams connectivity
-     */
-
-
     @Override
     protected Endpoint<StreamExchange> createEndpoint(String uri, String remaining, Map parameters)
         throws Exception {

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Sun May 11 09:32:56 2008
@@ -38,26 +38,31 @@
 /**
  * Consumer that can read from any stream
  */
-
 public class StreamConsumer extends DefaultConsumer<StreamExchange> {
 
+    private static final transient Log LOG = LogFactory.getLog(StreamConsumer.class);
     private static final String TYPES = "in";
     private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
-    private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
     protected InputStream inputStream = System.in;
-    Endpoint<StreamExchange> endpoint;
-    private Map<String, String> parameters;
+    protected Endpoint<StreamExchange> endpoint;
     private String uri;
-
+    private String file;
+    private String url;
 
     public StreamConsumer(Endpoint<StreamExchange> endpoint, Processor processor, String uri,
                           Map<String, String> parameters) throws Exception {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        this.parameters = parameters;
+        this.uri = uri;
+
+        file = parameters.get("file");
+        url = parameters.get("url");
+        // must remove the options this component supports
+        parameters.remove("file");
+        parameters.remove("url");
+
         validateUri(uri);
-        LOG.debug("Stream consumer created");
     }
 
     @Override
@@ -73,19 +78,26 @@
         }
 
         BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
-        String line = null;
+        String line;
         try {
             while ((line = br.readLine()) != null) {
                 consume(line);
             }
-            br.close();
         } catch (IOException e) {
-            e.printStackTrace();
             throw new StreamComponentException(e);
         } catch (Exception e) {
-            e.printStackTrace();
             throw new StreamComponentException(e);
+        } finally {
+            br.close();
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (inputStream != null) {
+            inputStream.close();
         }
+        super.doStop();
     }
 
     public void consume(Object o) throws Exception {
@@ -95,17 +107,18 @@
     }
 
     private InputStream resolveStreamFromUrl() throws IOException {
-        String u = parameters.get("url");
+        String u = url;
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getInputStream();
     }
 
     private InputStream resolveStreamFromFile() throws IOException {
-        String fileName = parameters.get("file");
-        fileName = fileName != null ? fileName.trim() : "_file";
+        String fileName = file != null ? file.trim() : "_file";
         File f = new File(fileName);
-        LOG.debug("About to read from file: " + f);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("About to read from file: " + f);
+        }
         f.createNewFile();
         return new FileInputStream(f);
     }
@@ -113,25 +126,18 @@
     private void validateUri(String uri) throws Exception {
         String[] s = uri.split(":");
         if (s.length < 2) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
         String[] t = s[1].split("\\?");
 
         if (t.length < 1) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
 
         this.uri = t[0].trim();
         if (!TYPES_LIST.contains(this.uri)) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
     }
 
-    @Override
-    public void stop() throws Exception {
-        super.stop();
-        if (inputStream != null) {
-            inputStream.close();
-        }
-    }
 }

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Sun May 11 09:32:56 2008
@@ -22,11 +22,8 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class StreamEndpoint extends DefaultEndpoint<StreamExchange> {
-    private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
     Producer<StreamExchange> producer;
     private Map<String, String> parameters;
     private String uri;
@@ -37,13 +34,11 @@
         super(uri, component);
         this.parameters = parameters;
         this.uri = uri;
-        LOG.debug(uri + " / " + remaining + " / " + parameters);
         this.producer = new StreamProducer(this, uri, parameters);
-
     }
 
-    public Consumer<StreamExchange> createConsumer(Processor p) throws Exception {
-        return new StreamConsumer(this, p, uri, parameters);
+    public Consumer<StreamExchange> createConsumer(Processor processor) throws Exception {
+        return new StreamConsumer(this, processor, uri, parameters);
     }
 
     public Producer<StreamExchange> createProducer() throws Exception {

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java Sun May 11 09:32:56 2008
@@ -28,6 +28,5 @@
 
     public StreamExchange(DefaultExchange parent) {
         super(parent);
-        // TODO Auto-generated constructor stub
     }
 }
\ No newline at end of file

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java Sun May 11 09:32:56 2008
@@ -19,7 +19,7 @@
 import org.apache.camel.impl.DefaultMessage;
 
 public class StreamMessage extends DefaultMessage {
-    Object o;
+    private Object o;
 
     public StreamMessage(Object o) {
         this.o = o;

Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Sun May 11 09:32:56 2008
@@ -30,34 +30,54 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class StreamProducer extends DefaultProducer<StreamExchange> {
 
-    private static final Log LOG = LogFactory.getLog(StreamProducer.class);
+    private static final transient Log LOG = LogFactory.getLog(StreamProducer.class);
     private static final String TYPES = "in,out,err,file,url,header";
     private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
     protected OutputStream outputStream = System.out;
     private String uri;
     private Map<String, String> parameters;
-
+    private String delay;
+    private String url;
+    private String file;
 
     public StreamProducer(Endpoint<StreamExchange> endpoint, String uri, Map<String, String> parameters)
         throws Exception {
         super(endpoint);
         this.parameters = parameters;
+
+        delay = parameters.get("delay");
+        url = parameters.get("url");
+        file = parameters.get("file");
+        // must remove the parameters this component support
+        parameters.remove("delay");
+        parameters.remove("url");
+        parameters.remove("file");
+
         validateUri(uri);
-        LOG.debug("Stream producer created");
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (outputStream != null) {
+            outputStream.close();
+        }
+        super.doStop();
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (parameters.get("delay") != null) {
-            long ms = Long.valueOf(parameters.get("delay"));
+        if (delay != null) {
+            long ms = ObjectConverter.toLong(delay);
             delay(ms);
         }
+
         if ("out".equals(uri)) {
             outputStream = System.out;
         } else if ("err".equals(uri)) {
@@ -73,17 +93,18 @@
     }
 
     private OutputStream resolveStreamFromUrl() throws IOException {
-        String u = parameters.get("url");
+        String u = url;
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getOutputStream();
     }
 
     private OutputStream resolveStreamFromFile() throws IOException {
-        String fileName = parameters.get("file");
-        fileName = fileName != null ? fileName.trim() : "_file";
+        String fileName = file != null ? file.trim() : "_file";
         File f = new File(fileName);
-        LOG.debug("About to write to file: " + f);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("About to write to file: " + f);
+        }
         f.createNewFile();
         return new FileOutputStream(f);
     }
@@ -97,18 +118,24 @@
     }
 
     private void delay(long ms) throws InterruptedException {
-        LOG.debug("Delaying " + ms + " milliseconds");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Delaying " + ms + " millis");
+        }
         Thread.sleep(ms);
     }
 
     private void writeToStream(Exchange exchange) throws IOException {
         Object body = exchange.getIn().getBody();
-        LOG.debug("Writing " + body + " to " + outputStream);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing " + body + " to " + outputStream);
+        }
         if (body instanceof String) {
             LOG.debug("in text buffered mode");
             BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
-            bw.write((String)body + "\n");
+            bw.write((String)body);
+            bw.write("\n");
             bw.flush();
+            bw.close();
         } else {
             LOG.debug("in binary stream mode");
             outputStream.write((byte[])body);
@@ -118,25 +145,19 @@
     private void validateUri(String uri) throws Exception {
         String[] s = uri.split(":");
         if (s.length < 2) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
         String[] t = s[1].split("\\?");
 
         if (t.length < 1) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
         this.uri = t[0].trim();
 
         if (!TYPES_LIST.contains(this.uri)) {
-            throw new Exception(INVALID_URI);
+            throw new IllegalArgumentException(INVALID_URI);
         }
     }
 
-    @Override
-    public void stop() throws Exception {
-        super.stop();
-        if (outputStream != null) {
-            outputStream.close();
-        }
-    }
 }
+