You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/04/01 11:55:00 UTC
svn commit: r760833 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/converter/
camel-core/src/main/java/org/apache/camel/converter/jaxp/
camel-core/src/main/java/org/apache/camel/converter/stream/
camel-core/src/test/java/org/apache/camel/...
Author: davsclaus
Date: Wed Apr 1 09:54:55 2009
New Revision: 760833
URL: http://svn.apache.org/viewvc?rev=760833&view=rev
Log:
CAMEL-1503: First part of doing SMX-Camel with XML streams easier.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (with props)
camel/trunk/components/camel-jms/src/test/data/
camel/trunk/components/camel-jms/src/test/data/message1.xml (with props)
camel/trunk/components/camel-jms/src/test/data/message2.xml (with props)
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java Wed Apr 1 09:54:55 2009
@@ -42,6 +42,7 @@
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.net.URL;
+import java.nio.CharBuffer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.dom.DOMSource;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java Wed Apr 1 09:54:55 2009
@@ -16,11 +16,15 @@
*/
package org.apache.camel.converter;
+import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Some core java.lang based <a
@@ -31,6 +35,8 @@
@Converter
public final class ObjectConverter {
+ private static final transient Log LOG = LogFactory.getLog(ObjectConverter.class);
+
/**
* Utility classes should not have a public constructor.
*/
@@ -102,8 +108,23 @@
}
@Converter
- public static byte[] toByteArray(String value) {
- return value.getBytes();
+ public static byte[] toByteArray(String value, Exchange exchange) {
+ byte[] bytes = null;
+ if (exchange != null) {
+ String charsetName = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
+ if (charsetName != null) {
+ try {
+ bytes = value.getBytes(charsetName);
+ } catch (UnsupportedEncodingException e) {
+ LOG.warn("Cannot convert the byte to String with the charset " + charsetName, e);
+ }
+ }
+ }
+ if (bytes == null) {
+ bytes = value.getBytes();
+ }
+
+ return bytes;
}
@Converter
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java Wed Apr 1 09:54:55 2009
@@ -180,6 +180,19 @@
}
/**
+ * Converts the given input Source into bytes
+ */
+ @Converter
+ public byte[] toByteArray(Source source, Exchange exchange) throws TransformerException {
+ String answer = toString(source);
+ if (exchange != null) {
+ return exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, answer);
+ } else {
+ return answer.getBytes();
+ }
+ }
+
+ /**
* Converts the given input Node into text
*/
@Converter
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Apr 1 09:54:55 2009
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -50,22 +51,16 @@
protected boolean outputLocked;
protected OutputStream currentStream;
+ private final List<Object> streamList = new ArrayList<Object>();
private long threshold = 64 * 1024;
-
private int totalLength;
-
- private boolean inmem;
-
+ private boolean inMemory;
private File tempFile;
+ private File outputDir;
- private File outputDir;
-
- private List<Object> streamList = new ArrayList<Object>();
-
-
public CachedOutputStream() {
currentStream = new ByteArrayOutputStream(2048);
- inmem = true;
+ inMemory = true;
}
public CachedOutputStream(long threshold) {
@@ -100,7 +95,6 @@
* output stream ... etc.)
*/
protected void doFlush() throws IOException {
-
}
public void flush() throws IOException {
@@ -112,14 +106,12 @@
* Perform any actions required on stream closure (handle response etc.)
*/
protected void doClose() throws IOException {
-
}
/**
* Perform any actions required after stream closure (close the other related stream etc.)
*/
protected void postClose() throws IOException {
-
}
/**
@@ -166,14 +158,14 @@
InputStream in = ac.getInputStream();
IOHelper.copyAndCloseInput(in, out);
} else {
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
if (copyOldContent && byteOut.size() > 0) {
byteOut.writeTo(out);
}
} else {
- throw new IOException("Unknown format of currentStream");
+ throw new IOException("Unknown format of currentStream: " + currentStream);
}
} else {
// read the file
@@ -185,7 +177,7 @@
streamList.remove(currentStream);
tempFile.delete();
tempFile = null;
- inmem = true;
+ inMemory = true;
}
}
currentStream = out;
@@ -202,7 +194,7 @@
public byte[] getBytes() throws IOException {
flush();
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return ((ByteArrayOutputStream)currentStream).toByteArray();
} else {
@@ -217,7 +209,7 @@
public void writeCacheTo(OutputStream out) throws IOException {
flush();
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
((ByteArrayOutputStream)currentStream).writeTo(out);
} else {
@@ -240,12 +232,12 @@
}
int count = 0;
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
} else {
- throw new IOException("Unknown format of currentStream");
+ throw new IOException("Unknown format of currentStream: " + currentStream);
}
} else {
// read the file
@@ -270,12 +262,12 @@
}
public void writeCacheTo(StringBuilder out) throws IOException {
flush();
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
out.append(IOHelper.newStringFromBytes(bytes));
} else {
- throw new IOException("Unknown format of currentStream");
+ throw new IOException("Unknown format of currentStream: " + currentStream);
}
} else {
// read the file
@@ -315,14 +307,13 @@
}
protected void onWrite() throws IOException {
-
}
public void write(byte[] b, int off, int len) throws IOException {
if (!outputLocked) {
onWrite();
this.totalLength += len;
- if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
createFileOutputStream();
}
currentStream.write(b, off, len);
@@ -333,7 +324,7 @@
if (!outputLocked) {
onWrite();
this.totalLength += b.length;
- if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
createFileOutputStream();
}
currentStream.write(b);
@@ -344,7 +335,7 @@
if (!outputLocked) {
onWrite();
this.totalLength++;
- if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+ if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
createFileOutputStream();
}
currentStream.write(b);
@@ -361,7 +352,7 @@
currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
bout.writeTo(currentStream);
- inmem = false;
+ inMemory = false;
streamList.add(currentStream);
}
@@ -371,7 +362,7 @@
public InputStream getInputStream() throws IOException {
flush();
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
@@ -388,14 +379,14 @@
streamList.add(fileInputStream);
return fileInputStream;
} catch (FileNotFoundException e) {
- throw new IOException("Cached file was deleted, " + e.toString());
+ throw IOHelper.createIOException("Cached file was already deleted", e);
}
}
}
public StreamCache getStreamCache() throws IOException {
flush();
- if (inmem) {
+ if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
@@ -403,27 +394,27 @@
}
} else {
try {
- FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this);
- return fileInputStream;
+ return new FileInputStreamCache(tempFile, this);
} catch (FileNotFoundException e) {
- throw new IOException("Cached file was deleted, " + e.toString());
+ throw IOHelper.createIOException("Cached file was already deleted", e);
}
}
}
private void maybeDeleteTempFile(Object stream) {
streamList.remove(stream);
- if (!inmem && tempFile != null && streamList.isEmpty()) {
+ if (!inMemory && tempFile != null && streamList.isEmpty()) {
tempFile.delete();
tempFile = null;
currentStream = new ByteArrayOutputStream(1024);
- inmem = true;
+ inMemory = true;
}
}
public void setOutputDir(File outputDir) throws IOException {
this.outputDir = outputDir;
}
+
public void setThreshold(long threshold) {
this.threshold = threshold;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Wed Apr 1 09:54:55 2009
@@ -26,41 +26,57 @@
import org.apache.camel.StreamCache;
public class FileInputStreamCache extends InputStream implements StreamCache {
- private FileInputStream inputStream;
+ private InputStream stream;
private CachedOutputStream cachedOutputStream;
private File file;
+ public FileInputStreamCache() {
+ }
+
public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
this.file = file;
- cachedOutputStream = cos;
- inputStream = new FileInputStream(file);
+ this.cachedOutputStream = cos;
+ this.stream = new FileInputStream(file);
}
+ @Override
public void close() {
try {
- inputStream.close();
- cachedOutputStream.close();
- } catch (Exception exception) {
- throw new RuntimeCamelException(exception);
+ getInputStream().close();
+ if (cachedOutputStream != null) {
+ cachedOutputStream.close();
+ }
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
}
}
- public void reset() {
+ @Override
+ public void reset() {
try {
- inputStream.close();
- inputStream = new FileInputStream(file);
- } catch (Exception exception) {
- throw new RuntimeCamelException(exception);
+ getInputStream().close();
+ // reset by creating a new stream based on the file
+ stream = new FileInputStream(file);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
}
}
+ @Override
public int available() throws IOException {
- return inputStream.available();
+ return getInputStream().available();
}
@Override
public int read() throws IOException {
- return inputStream.read();
- }
-
-}
+ return getInputStream().read();
+ }
+
+ protected InputStream getInputStream() throws FileNotFoundException {
+ if (file != null && stream == null) {
+ stream = new FileInputStream(file);
+ }
+ return stream;
+ }
+
+}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java Wed Apr 1 09:54:55 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.camel.StreamCache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for Cache the Reader {@link java.io.Reader}s
+ */
+public class ReaderCache extends StringReader implements StreamCache {
+
+ private static final transient Log LOG = LogFactory.getLog(ReaderCache.class);
+
+ private String data;
+
+ public ReaderCache(String data) {
+ super(data);
+ this.data = data;
+ }
+
+ public void close() {
+ // Do not release the string for caching
+ }
+
+ @Override
+ public void reset() {
+ try {
+ super.reset();
+ } catch (IOException e) {
+ LOG.warn("Cannot reset cache", e);
+ }
+ }
+
+ String getData() {
+ return data;
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java Wed Apr 1 09:54:55 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.converter.jaxp.StringSource;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.converter.jaxp.StringSource}s
+ */
+public class SourceCache extends StringSource implements StreamCache {
+
+ private static final long serialVersionUID = 1L;
+
+ public SourceCache() {
+ }
+
+ public SourceCache(String data) {
+ new StringSource(data);
+ }
+
+ public void reset() {
+ // do nothing here
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Wed Apr 1 09:54:55 2009
@@ -19,20 +19,19 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
-import java.io.StringReader;
-
import javax.xml.transform.TransformerException;
import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamSource;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
import org.apache.camel.converter.jaxp.BytesSource;
import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.util.IOHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* A set of {@link Converter} methods for wrapping stream-based messages in a {@link StreamCache}
@@ -40,25 +39,24 @@
*/
@Converter
public class StreamCacheConverter {
- private static final transient Log LOG = LogFactory.getLog(StreamCacheConverter.class);
@Converter
public StreamCache convertToStreamCache(StreamSource source, Exchange exchange) throws IOException {
return new StreamSourceCache(source, exchange);
}
-
+
@Converter
public StreamCache convertToStreamCache(StringSource source) {
//no need to do stream caching for a StringSource
return null;
}
-
+
@Converter
public StreamCache convertToStreamCache(BytesSource source) {
//no need to do stream caching for a BytesSource
return null;
}
-
+
@Converter
public StreamCache convertToStreamCache(SAXSource source, Exchange exchange) throws TransformerException {
String data = exchange.getContext().getTypeConverter().convertTo(String.class, source);
@@ -69,7 +67,7 @@
public StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
// set up CachedOutputStream with the properties
CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
- IOHelper.copyAndCloseInput(stream, cos);
+ IOHelper.copyAndCloseInput(stream, cos);
return cos.getStreamCache();
}
@@ -79,81 +77,4 @@
return new ReaderCache(data);
}
- /*
- * {@link StreamCache} implementation for {@link Source}s
- */
- private class SourceCache extends StringSource implements StreamCache {
-
- private static final long serialVersionUID = 4147248494104812945L;
-
- public SourceCache() {
- }
-
- public SourceCache(String text) {
- super(text);
- }
-
- public void reset() {
- // do nothing here
- }
-
- }
-
- /*
- * {@link StreamCache} implementation for Cache the StreamSource {@link StreamSource}s
- */
- private class StreamSourceCache extends StreamSource implements StreamCache {
- StreamCache streamCache;
- ReaderCache readCache;
-
- public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
- if (source.getInputStream() != null) {
- // set up CachedOutputStream with the properties
- CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
- IOHelper.copyAndCloseInput(source.getInputStream(), cos);
- streamCache = cos.getStreamCache();
- setInputStream((InputStream)streamCache);
- setSystemId(source.getSystemId());
- }
- if (source.getReader() != null) {
- String data = exchange.getContext().getTypeConverter().convertTo(String.class, source.getReader());
- readCache = new ReaderCache(data);
- setReader(readCache);
- }
- }
-
- public void reset() {
- if (streamCache != null) {
- streamCache.reset();
- }
- if (readCache != null) {
- readCache.reset();
- }
- }
-
- }
-
- private class ReaderCache extends StringReader implements StreamCache {
-
- public ReaderCache(String s) {
- super(s);
- }
-
- public void reset() {
- try {
- super.reset();
- } catch (IOException e) {
- LOG.warn("Cannot reset cache", e);
- }
- }
-
- public void close() {
- // Do not release the string for caching
- }
-
- }
-
-
-
-
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java Wed Apr 1 09:54:55 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.CharBuffer;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for Cache the StreamSource {@link javax.xml.transform.stream.StreamSource}s
+ */
+public class StreamSourceCache extends StreamSource implements StreamCache {
+
+ private transient InputStream stream;
+ private StreamCache streamCache;
+ private ReaderCache readCache;
+
+ public StreamSourceCache() {
+ }
+
+ public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
+ if (source.getInputStream() != null) {
+ // set up CachedOutputStream with the properties
+ CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+ IOHelper.copyAndCloseInput(source.getInputStream(), cos);
+ streamCache = cos.getStreamCache();
+ setSystemId(source.getSystemId());
+ }
+ if (source.getReader() != null) {
+ String data = exchange.getContext().getTypeConverter().convertTo(String.class, source.getReader());
+ readCache = new ReaderCache(data);
+ setReader(readCache);
+ }
+ }
+
+ public void reset() {
+ if (streamCache != null) {
+ streamCache.reset();
+ }
+ if (readCache != null) {
+ readCache.reset();
+ }
+ if (stream != null) {
+ try {
+ stream.reset();
+ } catch (IOException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ if (stream == null) {
+ if (streamCache instanceof InputStream) {
+ stream = (InputStream) streamCache;
+ } else if (readCache != null) {
+ String data = readCache.getData();
+ stream = new ByteArrayInputStream(data.getBytes());
+ }
+ }
+ return stream;
+ }
+
+ @Override
+ public void setInputStream(InputStream inputStream) {
+ // noop as the input stream is from stream or reader cache
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java Wed Apr 1 09:54:55 2009
@@ -63,7 +63,6 @@
}
public void testCacheStreamToFileAndCloseStream() throws IOException {
-
CachedOutputStream cos = new CachedOutputStream(16);
cos.setOutputDir(file);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -85,11 +84,9 @@
}
files = file.list();
assertEquals("we should have no temp file", files.length, 0);
-
}
public void testCacheStreamToFileAndNotCloseStream() throws IOException {
-
CachedOutputStream cos = new CachedOutputStream(16);
cos.setOutputDir(file);
cos.write(TEST_STRING.getBytes("UTF-8"));
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Wed Apr 1 09:54:55 2009
@@ -19,7 +19,6 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;
-import java.io.Reader;
import java.io.StringReader;
import javax.xml.transform.stream.StreamSource;
@@ -69,7 +68,8 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
+ // 0 delay for faster unit test
+ errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3).delay(0));
from("direct:start").process(new Processor() {
public void process(Exchange exchange) throws Exception {
count++;
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Wed Apr 1 09:54:55 2009
@@ -371,27 +371,25 @@
switch (type) {
case Text: {
TextMessage message = session.createTextMessage();
- String payload = context.getTypeConverter().convertTo(String.class, body);
+ String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
message.setText(payload);
return message;
}
case Bytes: {
BytesMessage message = session.createBytesMessage();
- byte[] payload = context.getTypeConverter().convertTo(byte[].class, body);
+ byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
message.writeBytes(payload);
return message;
}
case Map: {
MapMessage message = session.createMapMessage();
- Map payload = context.getTypeConverter().convertTo(Map.class, body);
+ Map payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
populateMapMessage(message, payload, context);
return message;
}
case Object:
- return session.createObjectMessage((Serializable)body);
- case Strem:
- // TODO: Stream is not supported
- break;
+ Serializable payload = context.getTypeConverter().convertTo(Serializable.class, exchange, body);
+ return session.createObjectMessage(payload);
default:
break;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java Wed Apr 1 09:54:55 2009
@@ -23,6 +23,6 @@
*/
public enum JmsMessageType {
- Bytes, Map, Object, Strem, Text
+ Bytes, Map, Object, Stream, Text
}
Added: camel/trunk/components/camel-jms/src/test/data/message1.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/data/message1.xml?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/data/message1.xml (added)
+++ camel/trunk/components/camel-jms/src/test/data/message1.xml Wed Apr 1 09:54:55 2009
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<person user="james">
+ <firstName>James</firstName>
+ <lastName>Strachan</lastName>
+ <city>London</city>
+</person>
\ No newline at end of file
Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: camel/trunk/components/camel-jms/src/test/data/message2.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/data/message2.xml?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/data/message2.xml (added)
+++ camel/trunk/components/camel-jms/src/test/data/message2.xml Wed Apr 1 09:54:55 2009
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<person user="hiram">
+ <firstName>Hiram</firstName>
+ <lastName>Chirino</lastName>
+ <city>Tampa</city>
+</person>
\ No newline at end of file
Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java Wed Apr 1 09:54:55 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.FileInputStream;
+import javax.jms.ConnectionFactory;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.jaxp.StringSource;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * For unit testing with XML streams that can be troublesome with the StreamCache
+ *
+ * @version $Revision$
+ */
+public class JmsXMLRouteTest extends ContextTestSupport {
+
+ private static final String TEST_LONDON = "src/test/data/message1.xml";
+ private static final String TEST_TAMPA = "src/test/data/message2.xml";
+
+ public void testLondonWithFileStream() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:london");
+ mock.expectedMessageCount(1);
+ mock.message(0).bodyAs(String.class).contains("James");
+
+ Source source = new StreamSource(new FileInputStream(TEST_LONDON));
+ assertNotNull(source);
+
+ template.sendBody("direct:start", source);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testTampaWithFileStream() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:tampa");
+ mock.expectedMessageCount(1);
+ mock.message(0).bodyAs(String.class).contains("Hiram");
+
+ Source source = new StreamSource(new FileInputStream(TEST_TAMPA));
+ assertNotNull(source);
+
+ template.sendBody("direct:start", source);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testLondonWithStringSource() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:london");
+ mock.expectedMessageCount(1);
+ mock.message(0).bodyAs(String.class).contains("James");
+
+ Source source = new StringSource("<person user=\"james\">\n"
+ + " <firstName>James</firstName>\n"
+ + " <lastName>Strachan</lastName>\n"
+ + " <city>London</city>\n"
+ + "</person>");
+ assertNotNull(source);
+
+ template.sendBody("direct:start", source);
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("activemq:queue:foo");
+
+ from("activemq:queue:foo")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ Object body = exchange.getIn().getBody();
+ // should preserve the object as Source
+ assertIsInstanceOf(Source.class, body);
+ }
+ })
+ .choice()
+ .when().xpath("/person/city = 'London'").to("mock:london")
+ .when().xpath("/person/city = 'Tampa'").to("mock:tampa")
+ .otherwise().to("mock:unknown")
+ .end();
+ }
+ };
+ }
+
+
+}
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date