You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/05/11 18:32:56 UTC
svn commit: r655332 -
/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/
Author: davsclaus
Date: Sun May 11 09:32:56 2008
New Revision: 655332
URL: http://svn.apache.org/viewvc?rev=655332&view=rev
Log:
CAMEL-433: better validation when creating an endpoint from uri
Removed:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java Sun May 11 09:32:56 2008
@@ -19,16 +19,12 @@
import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+/**
+ * Component providing streams connectivity
+ */
public class StreamComponent extends DefaultComponent<StreamExchange> {
- /**
- * Component providing streams connectivity
- */
-
-
@Override
protected Endpoint<StreamExchange> createEndpoint(String uri, String remaining, Map parameters)
throws Exception {
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Sun May 11 09:32:56 2008
@@ -38,26 +38,31 @@
/**
* Consumer that can read from any stream
*/
-
public class StreamConsumer extends DefaultConsumer<StreamExchange> {
+ private static final transient Log LOG = LogFactory.getLog(StreamConsumer.class);
private static final String TYPES = "in";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
- private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
protected InputStream inputStream = System.in;
- Endpoint<StreamExchange> endpoint;
- private Map<String, String> parameters;
+ protected Endpoint<StreamExchange> endpoint;
private String uri;
-
+ private String file;
+ private String url;
public StreamConsumer(Endpoint<StreamExchange> endpoint, Processor processor, String uri,
Map<String, String> parameters) throws Exception {
super(endpoint, processor);
this.endpoint = endpoint;
- this.parameters = parameters;
+ this.uri = uri;
+
+ file = parameters.get("file");
+ url = parameters.get("url");
+ // must remove the options this component supports
+ parameters.remove("file");
+ parameters.remove("url");
+
validateUri(uri);
- LOG.debug("Stream consumer created");
}
@Override
@@ -73,19 +78,26 @@
}
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
- String line = null;
+ String line;
try {
while ((line = br.readLine()) != null) {
consume(line);
}
- br.close();
} catch (IOException e) {
- e.printStackTrace();
throw new StreamComponentException(e);
} catch (Exception e) {
- e.printStackTrace();
throw new StreamComponentException(e);
+ } finally {
+ br.close();
+ }
+ }
+
+ @Override
+ public void doStop() throws Exception {
+ if (inputStream != null) {
+ inputStream.close();
}
+ super.doStop();
}
public void consume(Object o) throws Exception {
@@ -95,17 +107,18 @@
}
private InputStream resolveStreamFromUrl() throws IOException {
- String u = parameters.get("url");
+ String u = url;
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getInputStream();
}
private InputStream resolveStreamFromFile() throws IOException {
- String fileName = parameters.get("file");
- fileName = fileName != null ? fileName.trim() : "_file";
+ String fileName = file != null ? file.trim() : "_file";
File f = new File(fileName);
- LOG.debug("About to read from file: " + f);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to read from file: " + f);
+ }
f.createNewFile();
return new FileInputStream(f);
}
@@ -113,25 +126,18 @@
private void validateUri(String uri) throws Exception {
String[] s = uri.split(":");
if (s.length < 2) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
String[] t = s[1].split("\\?");
if (t.length < 1) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
this.uri = t[0].trim();
if (!TYPES_LIST.contains(this.uri)) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
}
- @Override
- public void stop() throws Exception {
- super.stop();
- if (inputStream != null) {
- inputStream.close();
- }
- }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Sun May 11 09:32:56 2008
@@ -22,11 +22,8 @@
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class StreamEndpoint extends DefaultEndpoint<StreamExchange> {
- private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
Producer<StreamExchange> producer;
private Map<String, String> parameters;
private String uri;
@@ -37,13 +34,11 @@
super(uri, component);
this.parameters = parameters;
this.uri = uri;
- LOG.debug(uri + " / " + remaining + " / " + parameters);
this.producer = new StreamProducer(this, uri, parameters);
-
}
- public Consumer<StreamExchange> createConsumer(Processor p) throws Exception {
- return new StreamConsumer(this, p, uri, parameters);
+ public Consumer<StreamExchange> createConsumer(Processor processor) throws Exception {
+ return new StreamConsumer(this, processor, uri, parameters);
}
public Producer<StreamExchange> createProducer() throws Exception {
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java Sun May 11 09:32:56 2008
@@ -28,6 +28,5 @@
public StreamExchange(DefaultExchange parent) {
super(parent);
- // TODO Auto-generated constructor stub
}
}
\ No newline at end of file
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java Sun May 11 09:32:56 2008
@@ -19,7 +19,7 @@
import org.apache.camel.impl.DefaultMessage;
public class StreamMessage extends DefaultMessage {
- Object o;
+ private Object o;
public StreamMessage(Object o) {
this.o = o;
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=655332&r1=655331&r2=655332&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Sun May 11 09:32:56 2008
@@ -30,34 +30,54 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class StreamProducer extends DefaultProducer<StreamExchange> {
- private static final Log LOG = LogFactory.getLog(StreamProducer.class);
+ private static final transient Log LOG = LogFactory.getLog(StreamProducer.class);
private static final String TYPES = "in,out,err,file,url,header";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
protected OutputStream outputStream = System.out;
private String uri;
private Map<String, String> parameters;
-
+ private String delay;
+ private String url;
+ private String file;
public StreamProducer(Endpoint<StreamExchange> endpoint, String uri, Map<String, String> parameters)
throws Exception {
super(endpoint);
this.parameters = parameters;
+
+ delay = parameters.get("delay");
+ url = parameters.get("url");
+ file = parameters.get("file");
+ // must remove the parameters this component support
+ parameters.remove("delay");
+ parameters.remove("url");
+ parameters.remove("file");
+
validateUri(uri);
- LOG.debug("Stream producer created");
+ }
+
+ @Override
+ public void doStop() throws Exception {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ super.doStop();
}
public void process(Exchange exchange) throws Exception {
- if (parameters.get("delay") != null) {
- long ms = Long.valueOf(parameters.get("delay"));
+ if (delay != null) {
+ long ms = ObjectConverter.toLong(delay);
delay(ms);
}
+
if ("out".equals(uri)) {
outputStream = System.out;
} else if ("err".equals(uri)) {
@@ -73,17 +93,18 @@
}
private OutputStream resolveStreamFromUrl() throws IOException {
- String u = parameters.get("url");
+ String u = url;
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getOutputStream();
}
private OutputStream resolveStreamFromFile() throws IOException {
- String fileName = parameters.get("file");
- fileName = fileName != null ? fileName.trim() : "_file";
+ String fileName = file != null ? file.trim() : "_file";
File f = new File(fileName);
- LOG.debug("About to write to file: " + f);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to write to file: " + f);
+ }
f.createNewFile();
return new FileOutputStream(f);
}
@@ -97,18 +118,24 @@
}
private void delay(long ms) throws InterruptedException {
- LOG.debug("Delaying " + ms + " milliseconds");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delaying " + ms + " millis");
+ }
Thread.sleep(ms);
}
private void writeToStream(Exchange exchange) throws IOException {
Object body = exchange.getIn().getBody();
- LOG.debug("Writing " + body + " to " + outputStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing " + body + " to " + outputStream);
+ }
if (body instanceof String) {
LOG.debug("in text buffered mode");
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
- bw.write((String)body + "\n");
+ bw.write((String)body);
+ bw.write("\n");
bw.flush();
+ bw.close();
} else {
LOG.debug("in binary stream mode");
outputStream.write((byte[])body);
@@ -118,25 +145,19 @@
private void validateUri(String uri) throws Exception {
String[] s = uri.split(":");
if (s.length < 2) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
String[] t = s[1].split("\\?");
if (t.length < 1) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
this.uri = t[0].trim();
if (!TYPES_LIST.contains(this.uri)) {
- throw new Exception(INVALID_URI);
+ throw new IllegalArgumentException(INVALID_URI);
}
}
- @Override
- public void stop() throws Exception {
- super.stop();
- if (outputStream != null) {
- outputStream.close();
- }
- }
}
+