You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/03/27 02:17:15 UTC
svn commit: r641677 - in /activemq/camel/trunk/components/camel-stream/src:
main/java/org/apache/camel/component/stream/
test/java/org/apache/camel/component/stream/
Author: ningjiang
Date: Wed Mar 26 18:17:06 2008
New Revision: 641677
URL: http://svn.apache.org/viewvc?rev=641677&view=rev
Log:
Fixed the camel-stream CS errors
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/ConsumingException.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/InputStreamHandler.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +19,6 @@
public interface InputStreamHandler {
- public void consume() throws ConsumingException;
+ void consume() throws ConsumingException;
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StdInHandler.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -34,7 +33,7 @@
this.sr = sr;
- }
+ }
public void consume() throws ConsumingException {
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,17 +23,16 @@
import org.apache.commons.logging.LogFactory;
public class StreamComponent extends DefaultComponent<StreamExchange> {
-
- /**
- * Component providing streams connectivity
- */
-
- private static final Log log = LogFactory.getLog(StreamComponent.class);
- @Override
- protected Endpoint<StreamExchange> createEndpoint(String uri,
- String remaining, Map parameters) throws Exception {
- return new StreamEndpoint(this, uri, remaining, parameters);
- }
+ /**
+ * Component providing streams connectivity
+ */
+
+
+ @Override
+ protected Endpoint<StreamExchange> createEndpoint(String uri, String remaining, Map parameters)
+ throws Exception {
+ return new StreamEndpoint(this, uri, remaining, parameters);
+ }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponentException.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +18,13 @@
public class StreamComponentException extends Exception {
- private static final long serialVersionUID = 8064764690837846894L;
+ private static final long serialVersionUID = 8064764690837846894L;
- public StreamComponentException(String s) {
- super(s);
- }
+ public StreamComponentException(String s) {
+ super(s);
+ }
- public StreamComponentException(Throwable t) {
- super(t);
- }
+ public StreamComponentException(Throwable t) {
+ super(t);
+ }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,106 +36,102 @@
import org.apache.commons.logging.LogFactory;
/**
- *
* Consumer that can read from any stream
- *
*/
public class StreamConsumer extends DefaultConsumer<StreamExchange> {
- private static final String TYPES = "in";
- private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{"
- + TYPES + "}'";
- private static final List<String> TYPES_LIST = Arrays.asList(TYPES
- .split(","));
- private static final Log log = LogFactory.getLog(StreamConsumer.class);
- Endpoint<StreamExchange> endpoint;
- private Map<String, String> parameters;
- private String uri;
- protected InputStream inputStream = System.in;
-
- public StreamConsumer(Endpoint<StreamExchange> endpoint,
- Processor processor, String uri, Map<String, String> parameters)
- throws Exception {
- super(endpoint, processor);
- this.endpoint = endpoint;
- this.parameters = parameters;
- validateUri(uri);
- log.debug("Stream consumer created");
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
-
- if ("in".equals(uri)) {
- inputStream = System.in;
- } else if ("file".equals(uri)) {
- inputStream = resolveStreamFromFile();
- } else if ("url".equals(uri)) {
- inputStream = resolveStreamFromUrl();
- }
-
- BufferedReader br = new BufferedReader(new InputStreamReader(
- inputStream));
- String line = null;
- try {
- while ((line = br.readLine()) != null) {
- consume(line);
- }
- br.close();
- } catch (IOException e) {
- e.printStackTrace();
- throw new StreamComponentException(e);
- } catch (Exception e) {
- e.printStackTrace();
- throw new StreamComponentException(e);
- }
- }
-
- public void consume(Object o) throws Exception {
- Exchange exchange = endpoint.createExchange();
- exchange.setIn(new StreamMessage(o));
- getProcessor().process(exchange);
- }
-
- private InputStream resolveStreamFromUrl() throws IOException {
- String u = parameters.get("url");
- URL url = new URL(u);
- URLConnection c = url.openConnection();
- return c.getInputStream();
- }
-
- private InputStream resolveStreamFromFile() throws IOException {
- String fileName = parameters.get("file");
- fileName = fileName != null ? fileName.trim() : "_file";
- File f = new File(fileName);
- log.debug("About to read from file: " + f);
- f.createNewFile();
- return new FileInputStream(f);
- }
-
- private void validateUri(String uri) throws Exception {
- String[] s = uri.split(":");
- if (s.length < 2) {
- throw new Exception(INVALID_URI);
- }
- String[] t = s[1].split("\\?");
-
- if (t.length < 1) {
- throw new Exception(INVALID_URI);
- }
-
- this.uri = t[0].trim();
- if (!TYPES_LIST.contains(this.uri)) {
- throw new Exception(INVALID_URI);
- }
- }
-
- @Override
- public void stop() throws Exception {
- super.stop();
- if (inputStream != null)
- inputStream.close();
- }
+ private static final String TYPES = "in";
+ private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
+ private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
+ private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
+ protected InputStream inputStream = System.in;
+ Endpoint<StreamExchange> endpoint;
+ private Map<String, String> parameters;
+ private String uri;
+
+
+ public StreamConsumer(Endpoint<StreamExchange> endpoint, Processor processor, String uri,
+ Map<String, String> parameters) throws Exception {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ this.parameters = parameters;
+ validateUri(uri);
+ LOG.debug("Stream consumer created");
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ if ("in".equals(uri)) {
+ inputStream = System.in;
+ } else if ("file".equals(uri)) {
+ inputStream = resolveStreamFromFile();
+ } else if ("url".equals(uri)) {
+ inputStream = resolveStreamFromUrl();
+ }
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ try {
+ while ((line = br.readLine()) != null) {
+ consume(line);
+ }
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new StreamComponentException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new StreamComponentException(e);
+ }
+ }
+
+ public void consume(Object o) throws Exception {
+ Exchange exchange = endpoint.createExchange();
+ exchange.setIn(new StreamMessage(o));
+ getProcessor().process(exchange);
+ }
+
+ private InputStream resolveStreamFromUrl() throws IOException {
+ String u = parameters.get("url");
+ URL url = new URL(u);
+ URLConnection c = url.openConnection();
+ return c.getInputStream();
+ }
+
+ private InputStream resolveStreamFromFile() throws IOException {
+ String fileName = parameters.get("file");
+ fileName = fileName != null ? fileName.trim() : "_file";
+ File f = new File(fileName);
+ LOG.debug("About to read from file: " + f);
+ f.createNewFile();
+ return new FileInputStream(f);
+ }
+
+ private void validateUri(String uri) throws Exception {
+ String[] s = uri.split(":");
+ if (s.length < 2) {
+ throw new Exception(INVALID_URI);
+ }
+ String[] t = s[1].split("\\?");
+
+ if (t.length < 1) {
+ throw new Exception(INVALID_URI);
+ }
+
+ this.uri = t[0].trim();
+ if (!TYPES_LIST.contains(this.uri)) {
+ throw new Exception(INVALID_URI);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,31 +26,31 @@
import org.apache.commons.logging.LogFactory;
public class StreamEndpoint extends DefaultEndpoint<StreamExchange> {
- Producer<StreamExchange> producer;
- private Map<String, String> parameters;
- private String uri;
- private static final Log log = LogFactory.getLog(StreamConsumer.class);
-
- public StreamEndpoint(StreamComponent component, String uri,
- String remaining, Map<String, String> parameters) throws Exception {
- super(uri, component);
- this.parameters = parameters;
- this.uri = uri;
- log.debug(uri + " / " + remaining + " / " + parameters);
- this.producer = new StreamProducer(this, uri, parameters);
-
- }
-
- public Consumer<StreamExchange> createConsumer(Processor p)
- throws Exception {
- return new StreamConsumer(this, p, uri, parameters);
- }
-
- public Producer<StreamExchange> createProducer() throws Exception {
- return producer;
- }
-
- public boolean isSingleton() {
- return true;
- }
+ private static final Log LOG = LogFactory.getLog(StreamConsumer.class);
+ Producer<StreamExchange> producer;
+ private Map<String, String> parameters;
+ private String uri;
+
+
+ public StreamEndpoint(StreamComponent component, String uri, String remaining,
+ Map<String, String> parameters) throws Exception {
+ super(uri, component);
+ this.parameters = parameters;
+ this.uri = uri;
+ LOG.debug(uri + " / " + remaining + " / " + parameters);
+ this.producer = new StreamProducer(this, uri, parameters);
+
+ }
+
+ public Consumer<StreamExchange> createConsumer(Processor p) throws Exception {
+ return new StreamConsumer(this, p, uri, parameters);
+ }
+
+ public Producer<StreamExchange> createProducer() throws Exception {
+ return producer;
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamExchange.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamMessage.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,25 +19,25 @@
import org.apache.camel.impl.DefaultMessage;
public class StreamMessage extends DefaultMessage {
- Object o;
+ Object o;
- public StreamMessage(Object o) {
- this.o = o;
- }
-
- @Override
- public String toString() {
- return o.toString();
- }
-
- @Override
- protected Object createBody() {
- return o;
- }
-
- @Override
- public Object getBody() {
- return o;
- }
+ public StreamMessage(Object o) {
+ this.o = o;
+ }
+
+ @Override
+ public String toString() {
+ return o.toString();
+ }
+
+ @Override
+ protected Object createBody() {
+ return o;
+ }
+
+ @Override
+ public Object getBody() {
+ return o;
+ }
-}
\ No newline at end of file
+}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,112 +36,107 @@
public class StreamProducer extends DefaultProducer<StreamExchange> {
- private static final Log log = LogFactory.getLog(StreamProducer.class);
- private static final String TYPES = "in,out,err,file,url,header";
- private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{"
- + TYPES + "}'";
-
- private static final List<String> TYPES_LIST = Arrays.asList(TYPES
- .split(","));
- private String uri;
- private Map<String, String> parameters;
- protected OutputStream outputStream = System.out;
-
- public StreamProducer(Endpoint<StreamExchange> endpoint, String uri,
- Map<String, String> parameters) throws Exception {
- super(endpoint);
- this.parameters = parameters;
- validateUri(uri);
- log.debug("Stream producer created");
- }
-
- public void process(Exchange exchange) throws Exception {
- if (parameters.get("delay") != null) {
- long ms = Long.valueOf(parameters.get("delay"));
- delay(ms);
- }
- if ("out".equals(uri)) {
- outputStream = System.out;
- } else if ("err".equals(uri)) {
- outputStream = System.err;
- } else if ("file".equals(uri)) {
- outputStream = resolveStreamFromFile();
- } else if ("header".equals(uri)) {
- outputStream = resolveStreamFromHeader(exchange.getIn().getHeader(
- "stream"));
- } else if ("url".equals(uri)) {
- outputStream = resolveStreamFromUrl();
- }
- writeToStream(exchange);
- }
-
- private OutputStream resolveStreamFromUrl() throws IOException {
- String u = parameters.get("url");
- URL url = new URL(u);
- URLConnection c = url.openConnection();
- return c.getOutputStream();
- }
-
- private OutputStream resolveStreamFromFile() throws IOException {
- String fileName = parameters.get("file");
- fileName = fileName != null ? fileName.trim() : "_file";
- File f = new File(fileName);
- log.debug("About to write to file: " + f);
- f.createNewFile();
- return new FileOutputStream(f);
- }
-
- private OutputStream resolveStreamFromHeader(Object o)
- throws StreamComponentException {
- if (o != null && o instanceof OutputStream)
- return (OutputStream) o;
- else {
- throw new StreamComponentException(
- "Expected OutputStream in header('stream'), found: " + o);
- }
- }
-
- private void delay(long ms) throws InterruptedException {
- log.debug("Delaying " + ms + " milliseconds");
- Thread.sleep(ms);
- }
-
- private void writeToStream(Exchange exchange) throws IOException {
- Object body = exchange.getIn().getBody();
- log.debug("Writing " + body + " to " + outputStream);
- if (body instanceof String) {
- log.debug("in text buffered mode");
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
- outputStream));
- bw.write((String) body + "\n");
- bw.flush();
- } else {
- log.debug("in binary stream mode");
- outputStream.write((byte[]) body);
- }
- }
-
- private void validateUri(String uri) throws Exception {
- String[] s = uri.split(":");
- if (s.length < 2) {
- throw new Exception(INVALID_URI);
- }
- String[] t = s[1].split("\\?");
-
- if (t.length < 1) {
- throw new Exception(INVALID_URI);
- }
- this.uri = t[0].trim();
-
- if (!TYPES_LIST.contains(this.uri)) {
- throw new Exception(INVALID_URI);
- }
- }
-
- @Override
- public void stop() throws Exception {
- super.stop();
- if (outputStream != null)
- outputStream.close();
- }
-}
\ No newline at end of file
+ private static final Log LOG = LogFactory.getLog(StreamProducer.class);
+ private static final String TYPES = "in,out,err,file,url,header";
+ private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
+ private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
+ protected OutputStream outputStream = System.out;
+ private String uri;
+ private Map<String, String> parameters;
+
+
+ public StreamProducer(Endpoint<StreamExchange> endpoint, String uri, Map<String, String> parameters)
+ throws Exception {
+ super(endpoint);
+ this.parameters = parameters;
+ validateUri(uri);
+ LOG.debug("Stream producer created");
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ if (parameters.get("delay") != null) {
+ long ms = Long.valueOf(parameters.get("delay"));
+ delay(ms);
+ }
+ if ("out".equals(uri)) {
+ outputStream = System.out;
+ } else if ("err".equals(uri)) {
+ outputStream = System.err;
+ } else if ("file".equals(uri)) {
+ outputStream = resolveStreamFromFile();
+ } else if ("header".equals(uri)) {
+ outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"));
+ } else if ("url".equals(uri)) {
+ outputStream = resolveStreamFromUrl();
+ }
+ writeToStream(exchange);
+ }
+
+ private OutputStream resolveStreamFromUrl() throws IOException {
+ String u = parameters.get("url");
+ URL url = new URL(u);
+ URLConnection c = url.openConnection();
+ return c.getOutputStream();
+ }
+
+ private OutputStream resolveStreamFromFile() throws IOException {
+ String fileName = parameters.get("file");
+ fileName = fileName != null ? fileName.trim() : "_file";
+ File f = new File(fileName);
+ LOG.debug("About to write to file: " + f);
+ f.createNewFile();
+ return new FileOutputStream(f);
+ }
+
+ private OutputStream resolveStreamFromHeader(Object o) throws StreamComponentException {
+ if (o != null && o instanceof OutputStream) {
+ return (OutputStream)o;
+ } else {
+ throw new StreamComponentException("Expected OutputStream in header('stream'), found: " + o);
+ }
+ }
+
+ private void delay(long ms) throws InterruptedException {
+ LOG.debug("Delaying " + ms + " milliseconds");
+ Thread.sleep(ms);
+ }
+
+ private void writeToStream(Exchange exchange) throws IOException {
+ Object body = exchange.getIn().getBody();
+ LOG.debug("Writing " + body + " to " + outputStream);
+ if (body instanceof String) {
+ LOG.debug("in text buffered mode");
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
+ bw.write((String)body + "\n");
+ bw.flush();
+ } else {
+ LOG.debug("in binary stream mode");
+ outputStream.write((byte[])body);
+ }
+ }
+
+ private void validateUri(String uri) throws Exception {
+ String[] s = uri.split(":");
+ if (s.length < 2) {
+ throw new Exception(INVALID_URI);
+ }
+ String[] t = s[1].split("\\?");
+
+ if (t.length < 1) {
+ throw new Exception(INVALID_URI);
+ }
+ this.uri = t[0].trim();
+
+ if (!TYPES_LIST.contains(this.uri)) {
+ throw new Exception(INVALID_URI);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ }
+}
Modified: activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamReceiver.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +18,6 @@
public interface StreamReceiver {
- public int pass(String s) throws Exception;
+ int pass(String s) throws Exception;
}
Modified: activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java?rev=641677&r1=641676&r2=641677&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java (original)
+++ activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java Wed Mar 26 18:17:06 2008
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,37 +19,36 @@
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.stream.StreamComponent;
public class StreamRouteBuilderTest extends ContextTestSupport {
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- context = createCamelContext();
- }
-
- protected CamelContext createCamelContext() throws Exception {
- CamelContext camelContext = super.createCamelContext();
- camelContext.addComponent("stream", new StreamComponent());
- return camelContext;
- }
-
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
- public void configure() {
- from("direct:start").setHeader("stream", constant(System.out))
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ context = createCamelContext();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ camelContext.addComponent("stream", new StreamComponent());
+ return camelContext;
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start").setHeader("stream", constant(System.out))
.to("stream:err", "stream:out", "stream:file?file=/tmp/foo", "stream:header");
- //from("stream:in").to("stream:out",
- // "stream:err?delay=5000");
- }
- };
- }
-
- public void testStringContent() {
- template.sendBody("direct:start", "<content/>");
- }
-
- public void testBinaryContent() {
- template.sendBody("direct:start", new byte[] { 1, 2, 3, 4 });
- }
+ // from("stream:in").to("stream:out",
+ // "stream:err?delay=5000");
+ }
+ };
+ }
+
+ public void testStringContent() {
+ template.sendBody("direct:start", "<content/>");
+ }
+
+ public void testBinaryContent() {
+ template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
+ }
}