You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/05/16 15:21:23 UTC

svn commit: r657053 - in /servicemix/smx3/trunk/core/servicemix-core/src: main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java

Author: lhein
Date: Fri May 16 06:21:23 2008
New Revision: 657053

URL: http://svn.apache.org/viewvc?rev=657053&view=rev
Log:
applied patch on behalf of Andrew Skiba (see SM-1349)
Thanks for the help, Andrew

Modified:
    servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java
    servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java

Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java?rev=657053&r1=657052&r2=657053&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java Fri May 16 06:21:23 2008
@@ -29,15 +29,18 @@
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 
+import javax.xml.transform.stream.StreamSource;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A simple flat file marshaler that can read fixed-length and csv text files
  * and converts them to XML
  * 
  * @author Juergen Mayrbaeurl
+ * @author Andrew Skiba
  * @since 3.2
  */
 public class SimpleFlatFileMarshaler extends DefaultFileMarshaler {
@@ -59,7 +62,8 @@
     private static final String XML_CLOSE_NEWLINE = ">\n";
     private static final String XML_CLOSE_ATTR_NEWLINE = "\">\n";
     private static final String XML_CLOSE_ATTR = "\">";
-
+    protected final Log log = LogFactory.getLog(getClass());
+    
     private boolean xmlDeclaration = true;
 
     /**
@@ -109,15 +113,25 @@
     private boolean insertColNumbers;
 
     private boolean skipKnownEmptyCols = true;
-
+    
+    private boolean skipAnyEmptyCols;
+    
     private boolean alwaysStripColContents = true;
-
+    
     private boolean insertRawData;
 
     private boolean insertColContentInAttribut;
 
     private int headerlinesCount;
 
+    public boolean isSkipAnyEmptyCols() {
+        return skipAnyEmptyCols;
+    }
+
+    public void setSkipAnyEmptyCols(boolean skipAnyEmptyCols) {
+        this.skipAnyEmptyCols = skipAnyEmptyCols;
+    }
+
     private static class CustomEndOfLineIterator
             implements Iterator {
 
@@ -198,76 +212,204 @@
     }
 
     public void readMessage(MessageExchange exchange,
-                            NormalizedMessage message, 
-                            InputStream in, 
+                            NormalizedMessage message,
+                            InputStream in,
                             String path) throws IOException, JBIException {
-        message.setContent(new StringSource(this.convertLinesToString(message, in, path)));
+        message.setContent(new StreamSource(this.convertLines(message, in, path)));
         message.setProperty(FILE_NAME_PROPERTY, new File(path).getName());
         message.setProperty(FILE_PATH_PROPERTY, path);
     }
-
+    
     // Implementation methods
     // -------------------------------------------------------------------------
     protected InputStream convertLinesToStream(NormalizedMessage message,
                                                InputStream in, String path) throws IOException {
-        return IOUtils.toInputStream(this.convertLinesToString(message, in,
-                path), "UTF-8");
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if buffer is untouched, headers will be proceed line-by-line
+        String wholeFileConverted = this.convertLinesToString(message, in, path);
+        if (wholeFileConverted != null) {
+            return IOUtils.toInputStream(wholeFileConverted, "UTF-8");
+        } else {
+            return new InputStreamWrapper(in, path, "UTF-8");
+        }
     }
 
     protected InputStream convertLines(NormalizedMessage message,
                                        InputStream in, String path) throws IOException {
         return this.convertLinesToStream(message, in, path);
     }
-
+    
+    //This method is for backward compatibility only, use InputStreamWrapper to
+    //process InputStream line-by-line
+    @Deprecated
     protected String convertLinesToString(NormalizedMessage message,
                                           InputStream in, String path) throws IOException {
-        Iterator lines;
-        if (lineSeparator == null) {
-            lines = IOUtils.lineIterator(in, this.encoding);
-        } else {
-            lines = new CustomEndOfLineIterator(in, this.encoding, lineSeparator);
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if returns null, file will be proceed line-by-line
+        return null;
+    }
+
+
+
+    final class InputStreamWrapper extends InputStream {
+
+        private boolean isEOF;
+        private byte[] cache;
+        private int cacheLen;
+        private int cachePos;
+        private InputStream in;
+        private String path;
+        private String outEncoding;
+        private int headerLinesLeft;
+        private Iterator lines;
+        private int lineNumber;
+        private boolean isFooterFilled;
+
+        InputStreamWrapper(InputStream in,
+                String path, String outEncoding) throws
+                UnsupportedEncodingException, IOException {
+            log.trace("Entered InputStreamWrapper constructor");
+
+            //make sure outEncoding is good, otherwise fail early
+            " ".getBytes(outEncoding);
+
+            this.in = in;
+            this.path = path;
+            this.outEncoding = outEncoding;
+            this.headerLinesLeft = headerlinesCount;
+            if (lineSeparator == null) {
+                lines = IOUtils.lineIterator(in, encoding);
+            } else {
+                lines =
+                        new CustomEndOfLineIterator(in, encoding, lineSeparator);
+            }
+            log.trace("Leaving InputStreamWrapper constructor");
         }
 
-        StringBuffer aBuffer = new StringBuffer(1024);
+        @Override
+        public int read() throws IOException {
+            fillCache();
+            if (isEOF) {
+                return -1;
+            }
+            return 0xFF & cache[cachePos++];
+        }
+
+        private void fillCache() throws IOException {
+            if (cachePos < cacheLen || isEOF) {
+                return;
+            }
+            if (cache == null) {
+                fillInitial();
+                return;
+            }
+            if (!lines.hasNext()) {
+                if (!isFooterFilled) {
+                    fillFooter();
+                    return;
+                } else {
+                    isEOF = true;
+                    cache = null;
+                    cachePos = 0;
+                    cacheLen = 0;
+                    return;
+                }
+            }
 
-        if (this.xmlDeclaration) {
-            aBuffer.append(XMLDECLARATION_LINE);
+            if (headerLinesLeft > 0) {
+                fillHeader();
+            } else {
+                fillBody();
+            }
         }
 
-        aBuffer.append(XML_OPEN + this.docElementname);
+        @Override
+        public int available() throws IOException {
+            return cacheLen - cachePos + in.available();
+        }
 
-        if (this.docElementNamespace != null) {
-            aBuffer.append("xmlns=\"");
-            aBuffer.append(this.docElementNamespace);
-            aBuffer.append("\"");
+        @Override
+        public void close() throws IOException {
+            in.close();
+        }
+
+        private void fill(String string) {
+            if (log.isTraceEnabled()) {
+                log.trace("InputStreamWrapper.fill(" + string + ")");
+            }
+            try {
+                cache = string.getBytes(outEncoding);
+            } catch (UnsupportedEncodingException ex) {
+                throw new RuntimeException(
+                        "Bug in the code flow: unsupported encoding should be detected in constructor",
+                        ex);
+            }
+            cachePos = 0;
+            cacheLen = cache.length;
         }
 
-        aBuffer.append(" name=\"");
-        aBuffer.append(new File(path).getName());
-        aBuffer.append("\"");
-
-        aBuffer.append(" location=\"");
-        aBuffer.append(path);
-        aBuffer.append(XML_CLOSE_ATTR_NEWLINE);
+        private void fillFooter() {
+            isFooterFilled = true;
+            fill(XML_OPEN_END + docElementname + XML_CLOSE_NEWLINE);
+        }
+
+        private void fillHeader() throws IOException {
+            headerLinesLeft--;
+            StringBuffer aBuffer = new StringBuffer(1024);
+            String headerLine = (String) lines.next();
+            convertHeaderline(aBuffer, headerLine);
+            fill(aBuffer.toString());
+        }
+
+        private void fillInitial() throws IOException {
+            StringBuffer aBuffer = new StringBuffer(1024);
+
+            if (xmlDeclaration) {
+                aBuffer.append(XMLDECLARATION_LINE);
+            }
+
+            aBuffer.append(XML_OPEN + docElementname);
 
-        this.processHeaderLines(aBuffer, lines);
+            if (docElementNamespace != null) {
+                aBuffer.append("xmlns=\"");
+                aBuffer.append(docElementNamespace);
+                aBuffer.append("\"");
+            }
+
+            aBuffer.append(" name=\"");
+            aBuffer.append(new File(path).getName());
+            aBuffer.append("\"");
+
+            aBuffer.append(" location=\"");
+            aBuffer.append(path);
+            aBuffer.append(XML_CLOSE_ATTR_NEWLINE);
+            int overridenCheck = aBuffer.length();
+            processHeaderLines(aBuffer, lines);
+            if (aBuffer.length() != overridenCheck) {
+                //headers were proceed by an overriden method, supress futher processing
+                headerLinesLeft = 0;
+            }
+            fill(aBuffer.toString());
+        }
+
+        private void fillBody() throws IOException {
+            lineNumber++;
 
-        int lineNumber = 1;
-        while (lines.hasNext()) {
+            StringBuffer aBuffer = new StringBuffer(1024);
             String lineText = (String) lines.next();
-            aBuffer.append(XML_OPEN + this.lineElementname);
+            aBuffer.append(XML_OPEN + lineElementname);
 
-            if (this.insertLineNumbers || this.insertRawData) {
-                if (this.insertLineNumbers) {
+            if (insertLineNumbers || insertRawData) {
+                if (insertLineNumbers) {
                     aBuffer.append(" number=\"");
                     aBuffer.append(lineNumber);
-                    if (!this.insertRawData) {
+                    if (!insertRawData) {
                         aBuffer.append(XML_CLOSE_ATTR);
                     } else {
                         aBuffer.append("\"");
                     }
                 }
-                if (this.insertRawData) {
+                if (insertRawData) {
                     aBuffer.append(" raw=\"");
                     aBuffer.append(lineText);
                     aBuffer.append(XML_CLOSE_ATTR);
@@ -276,36 +418,31 @@
                 aBuffer.append(XML_CLOSE);
             }
 
-            if ((this.columnLengths != null)
-                    || (this.lineFormat != LINEFORMAT_FIXLENGTH)) {
-                this.extractColumns(aBuffer, lineText, lines);
+            if ((columnLengths != null)
+                    || (lineFormat != LINEFORMAT_FIXLENGTH)) {
+                extractColumns(aBuffer, lineText, lines);
             } else {
                 aBuffer.append(lineText);
             }
-            aBuffer.append(XML_OPEN_END + this.lineElementname
+            aBuffer.append(XML_OPEN_END + lineElementname
                     + XML_CLOSE_NEWLINE);
-
-            lineNumber++;
+            
+            fill(aBuffer.toString());
         }
-
-        aBuffer.append(XML_OPEN_END + this.docElementname + XML_CLOSE_NEWLINE);
-
-        return aBuffer.toString();
     }
 
+    
+    @Deprecated
     protected void processHeaderLines(StringBuffer buffer, Iterator lines) {
-        if ((this.headerlinesCount > 0) && (lines.hasNext())) {
-            int counter = 0;
-            do {
-                String headerline = (String) lines.next();
-                this.convertHeaderline(buffer, headerline);
-
-                counter++;
-            } while ((counter < this.headerlinesCount) && (lines.hasNext()));
-        }
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if buffer is untouched, headers will be proceed line-by-line
     }
 
-    protected void convertHeaderline(StringBuffer buffer, String headerline) {
+    protected void convertHeaderline(StringBuffer buffer, String headerLine) {
+        buffer.append("<!-- ");
+        headerLine = TEXT_STRIPPER.convertToXml(headerLine);
+        headerLine = headerLine.replaceAll("--", "__");
+        buffer.append(" -->\n");
     }
 
     protected void extractColumns(StringBuffer buffer, String lineText,
@@ -324,8 +461,7 @@
                     // Or maybe insert NULL Element
                 } else {
                     if (!((colContents.length() == 0)
-                            && (this.skipKnownEmptyCols) && (!this.columnElementname
-                            .equals(colName)))) {
+                            && (this.skipAnyEmptyCols || (this.skipKnownEmptyCols && (!this.columnElementname.equals(colName)))))) {
                         if (this.insertColContentInAttribut) {
                             buffer.append(XML_OPEN + colName);
 

Modified: servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java?rev=657053&r1=657052&r2=657053&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/components/util/SimpleFlatFileMarshalerTest.java Fri May 16 06:21:23 2008
@@ -20,47 +20,50 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.jbi.JBIException;
 import junit.framework.TestCase;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 
 public class SimpleFlatFileMarshalerTest extends TestCase {
 
-    public void testFixedLengthMarshalling() throws FileNotFoundException, IOException {
+    public void testFixedLengthMarshalling() throws FileNotFoundException, IOException, JBIException {
         SimpleFlatFileMarshaler marshaler = new SimpleFlatFileMarshaler();
         marshaler.setLineFormat(SimpleFlatFileMarshaler.LINEFORMAT_FIXLENGTH);
         marshaler.setColumnLengths(new String[] {"2", "3", "5" });
 
         String path = "./src/test/resources/org/apache/servicemix/components/util/fixedlength.txt";
-        String result = marshaler.convertLinesToString(null, new FileInputStream(new File(path)), path);
+        String result = convertLinesToString(marshaler, new FileInputStream(new File(path)), path);
         assertNotNull(result);
         assertTrue(result.length() > 0);
 
         assertTrue(result.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"));
         assertTrue(result.contains("<File"));
-        assertTrue(result.endsWith("</File>\n"));
+        assertTrue(result.endsWith("</File>"));
 
         assertTrue(StringUtils.countMatches(result, "<Line") == 3);
     }
 
-    public void testFixedLengthWithColNamesMarshalling() throws FileNotFoundException, IOException {
+    public void testFixedLengthWithColNamesMarshalling() throws FileNotFoundException, IOException, JBIException {
         SimpleFlatFileMarshaler marshaler = new SimpleFlatFileMarshaler();
         marshaler.setLineFormat(SimpleFlatFileMarshaler.LINEFORMAT_FIXLENGTH);
         marshaler.setColumnLengths(new String[] {"2", "3", "5" });
         marshaler.setColumnNames(new String[] {"First", "Second", "Third" });
 
         String path = "./src/test/resources/org/apache/servicemix/components/util/fixedlength.txt";
-        String result = marshaler.convertLinesToString(null, new FileInputStream(new File(path)), path);
+        String result = convertLinesToString(marshaler, new FileInputStream(new File(path)), path);
         assertNotNull(result);
         assertTrue(result.length() > 0);
 
         assertTrue(result.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"));
         assertTrue(result.contains("<File"));
-        assertTrue(result.endsWith("</File>\n"));
+        assertTrue(result.endsWith("</File>"));
 
         assertTrue(StringUtils.countMatches(result, "<Line") == 3);
         assertTrue(StringUtils.countMatches(result, "<First") == 3);
@@ -68,7 +71,7 @@
         assertTrue(StringUtils.countMatches(result, "<Third") == 3);
     }
 
-    public void testFixedLengthWithConversionMarshalling() throws FileNotFoundException, IOException {
+    public void testFixedLengthWithConversionMarshalling() throws FileNotFoundException, IOException, JBIException {
         SimpleFlatFileMarshaler marshaler = new SimpleFlatFileMarshaler();
         marshaler.setLineFormat(SimpleFlatFileMarshaler.LINEFORMAT_FIXLENGTH);
         marshaler.setColumnLengths(new String[] {"2", "3", "5", "8" });
@@ -82,13 +85,13 @@
         marshaler.setColumnConverters(columnConverters);
 
         String path = "./src/test/resources/org/apache/servicemix/components/util/fixedlength_morecomplex.txt";
-        String result = marshaler.convertLinesToString(null, new FileInputStream(new File(path)), path);
+        String result = convertLinesToString(marshaler, new FileInputStream(new File(path)), path);
         assertNotNull(result);
         assertTrue(result.length() > 0);
 
         assertTrue(result.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"));
         assertTrue(result.contains("<File"));
-        assertTrue(result.endsWith("</File>\n"));
+        assertTrue(result.endsWith("</File>"));
 
         assertTrue(StringUtils.countMatches(result, "<Line") == 3);
         assertTrue(StringUtils.countMatches(result, "<Number") == 3);
@@ -97,20 +100,70 @@
         assertTrue(StringUtils.countMatches(result, "<Date") == 2);
     }
 
-    public void testCSVMarshalling() throws FileNotFoundException, IOException {
+    public void testCSVMarshalling() throws FileNotFoundException, IOException, JBIException {
         SimpleFlatFileMarshaler marshaler = new SimpleFlatFileMarshaler();
         marshaler.setLineFormat(SimpleFlatFileMarshaler.LINEFORMAT_CSV);
 
         String path = "./src/test/resources/org/apache/servicemix/components/util/csv_simplesample.csv";
-        String result = marshaler.convertLinesToString(null, new FileInputStream(new File(path)), path);
+        String result = convertLinesToString(marshaler, new FileInputStream(new File(path)), path);
         assertNotNull(result);
         assertTrue(result.length() > 0);
 
         assertTrue(result.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"));
         assertTrue(result.contains("<File"));
-        assertTrue(result.endsWith("</File>\n"));
+        assertTrue(result.endsWith("</File>"));
 
         assertTrue(StringUtils.countMatches(result, "<Line") == 3);
     }
+    
+    class FileFewTimes extends InputStream {
+        byte [] buffer;
+        int pos;
+        int timesLeft;
+        public FileFewTimes(File file, int times) throws FileNotFoundException, IOException {
+            this.timesLeft = times;
+            buffer = new byte [(int)file.length()];
+            FileInputStream fs = new FileInputStream(file);
+            if (buffer.length != fs.read(buffer)) {
+                throw new IOException("Unexpected end of file " + file.getCanonicalPath());
+            }
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (pos < buffer.length) {
+                return buffer[pos++];
+            }
+            if (timesLeft == 0) {
+                return -1;
+            }
+            timesLeft--;
+            pos = 0;
+            return buffer[pos++];
+        }
+    }
+    
+    public void testHugeStream() throws FileNotFoundException, IOException {
+        SimpleFlatFileMarshaler marshaler = new SimpleFlatFileMarshaler();
+        marshaler.setLineFormat(SimpleFlatFileMarshaler.LINEFORMAT_CSV);
+
+        String path = "./src/test/resources/org/apache/servicemix/components/util/csv_simplesample.csv";
+        InputStream in = new FileFewTimes(new File(path), 500000);
+        InputStream out = marshaler.convertLines(null, in, path);
+        int r = 0;
+        while (r != -1) {
+            r = out.read();
+        }
+    }
+
+    private String convertLinesToString(SimpleFlatFileMarshaler marshaler, FileInputStream fileInputStream,
+            String path) throws IOException, JBIException {
+        InputStream out = marshaler.convertLines(null, fileInputStream, path);
+        StringBuilder sb = new StringBuilder();
+        for (Object string : IOUtils.readLines(out)) {
+            sb.append(string);
+        }
+        return sb.toString();
+    }
 
 }