You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by coolgold <gr...@edmunds.com> on 2010/10/04 19:14:43 UTC

FTP to HDFS - large gzipped files

I'm considering using camel and activemq for moving gzipped (~1gb) files form
ftp server to hdfs. I'm aware of existence of ftp and hdfs components, but
I'm not sure about the support for splitting for gzipped files or streaming
method with FTP entry point.  What is the most straightforward/scalable
approach?
-- 
View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p3192431.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: FTP to HDFS - large gzipped files

Posted by Mond Raymond <mo...@gmail.com>.
OK.  It's done

https://issues.apache.org/jira/browse/CAMEL-4024

--
View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4433386.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: FTP to HDFS - large gzipped files

Posted by Claus Ibsen <cl...@gmail.com>.
Nice. Do you mind creating a jira ticket and attach you source. Then
we can use it as a contribution.

On Thursday, May 26, 2011, Mond Raymond <mo...@gmail.com> wrote:
> Claus,
>
> Just thought I would follow up.
>
> I don't know if this counts as a contribution, but I ended up doing this as
> a splitter as in:
>
>         from(fromUri)
>                 .split().method(OuterZipFileDecompressingSplitter.class,
> "split")
>                 .streaming()
>                 .parallelProcessing()
>                 .to("direct:innerZipFile");
>
>         from("direct:innerZipFile")
>                 .split().method(InnerZipFileDecompressingSplitter.class,
> "split")
>                 .streaming()
>                 .parallelProcessing()
>                 .to("direct:furtherProcessing");
>
> The splitters are:
>
>
> public class InnerZipFileDecompressingSplitter {
>
>     public List<Message> split(InputStream is) throws Exception {
>         List<Message> messages = new ArrayList<Message>();
>
>         ZipInputStream zis = new ZipInputStream(new
> BufferedInputStream(is));
>
>         try {
>
>             Map&lt;String, String&gt; entries =
> getUncompressedZipEntries(zis);
>
>             for (Map.Entry&lt;String, String&gt; mapEntry :
> entries.entrySet()) {
>
>                 DefaultMessage message = new DefaultMessage();
>
>                 message.setHeader("Unzipped-File-Name", mapEntry.getKey());
>                 message.setBody(mapEntry.getValue());
>
>                 messages.add(message);
>             }
>
>         } finally {
>             zis.close();
>         }
>
>         return messages;
>     }
>
>     private Map&lt;String, String&gt;
> getUncompressedZipEntries(ZipInputStream zis) throws Exception {
>         Map&lt;String, String&gt; entries = new HashMap&lt;String,
> String&gt;();
>
>         ZipEntry entry;
>
>         while ((entry = zis.getNextEntry()) != null) {
>             ByteArrayOutputStream bos = new ByteArrayOutputStream();
>             IOHelper.copy(zis, bos);
>             entries.put(entry.getName(), bos.toString());
>             zis.closeEntry();
>         }
>
>         return entries;
>     }
>
> }
>
>
> public class OuterZipFileDecompressingSplitter {
>
>     public List<Message> split(InputStream is) throws Exception {
>
>         List<Message> messages = new ArrayList<Message>();
>
>         ZipInputStream zis = new ZipInputStream(new
> BufferedInputStream(is));
>
>         try {
>
>             ZipEntry entry;
>
>             while ((entry = zis.getNextEntry()) != null) {
>                 if (entry.isDirectory())
>                     continue;
>
>                 DefaultMessage message = new DefaultMessage();
>                 message.setHeader("Inner-Zip-Name", entry.getName());
>                 message.setBody(getZipEntryAsByteArray(zis));
>
>                 messages.add(message);
>
>                 zis.closeEntry();
>             }
>
>         } finally {
>             zis.close();
>         }
>
>         return messages;
>     }
>
>
>     private byte[] getZipEntryAsByteArray(ZipInputStream zis) throws
> Exception {
>         ByteArrayOutputStream bos = new ByteArrayOutputStream();
>
>         IOHelper.copy(zis, bos);
>
>         return bos.toByteArray();
>     }
>
> }
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4429800.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
CamelOne 2011: http://fusesource.com/camelone2011/
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: FTP to HDFS - large gzipped files

Posted by Mond Raymond <mo...@gmail.com>.
Claus,

Just thought I would follow up.  

I don't know if this counts as a contribution, but I ended up doing this as
a splitter as in:

        from(fromUri)
                .split().method(OuterZipFileDecompressingSplitter.class,
"split")
                .streaming()
                .parallelProcessing()
                .to("direct:innerZipFile");

        from("direct:innerZipFile")
                .split().method(InnerZipFileDecompressingSplitter.class,
"split")
                .streaming()
                .parallelProcessing()
                .to("direct:furtherProcessing");

The splitters are:


public class InnerZipFileDecompressingSplitter {

    public List<Message> split(InputStream is) throws Exception {
        List<Message> messages = new ArrayList<Message>();

        ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));

        try {

            Map&lt;String, String&gt; entries =
getUncompressedZipEntries(zis);

            for (Map.Entry&lt;String, String&gt; mapEntry :
entries.entrySet()) {

                DefaultMessage message = new DefaultMessage();

                message.setHeader("Unzipped-File-Name", mapEntry.getKey());
                message.setBody(mapEntry.getValue());

                messages.add(message);
            }

        } finally {
            zis.close();
        }

        return messages;
    }

    private Map&lt;String, String&gt;
getUncompressedZipEntries(ZipInputStream zis) throws Exception {
        Map&lt;String, String&gt; entries = new HashMap&lt;String,
String&gt;();

        ZipEntry entry;

        while ((entry = zis.getNextEntry()) != null) {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            IOHelper.copy(zis, bos);
            entries.put(entry.getName(), bos.toString());
            zis.closeEntry();
        }

        return entries;
    }

}


public class OuterZipFileDecompressingSplitter {

    public List<Message> split(InputStream is) throws Exception {

        List<Message> messages = new ArrayList<Message>();

        ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));

        try {

            ZipEntry entry;

            while ((entry = zis.getNextEntry()) != null) {
                if (entry.isDirectory())
                    continue;

                DefaultMessage message = new DefaultMessage();
                message.setHeader("Inner-Zip-Name", entry.getName());
                message.setBody(getZipEntryAsByteArray(zis));

                messages.add(message);

                zis.closeEntry();
            }

        } finally {
            zis.close();
        }

        return messages;
    }


    private byte[] getZipEntryAsByteArray(ZipInputStream zis) throws
Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        IOHelper.copy(zis, bos);

        return bos.toByteArray();
    }

}


--
View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4429800.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: FTP to HDFS - large gzipped files

Posted by Mond Raymond <mo...@gmail.com>.
Here is a Processor that works on ZIP files:

import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;

public class ZipFileSplittingProcessor implements Processor {

    public void process(Exchange exchange) throws Exception {
        InputStream is = ExchangeHelper.getMandatoryInBody(exchange,
InputStream.class);
        ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));
        exchange.getIn().setBody(getUncompressedZipEntries(zis));
    }

    private Collection<String> getUncompressedZipEntries(ZipInputStream zis)
throws Exception {
        Collection<String> entries = new ArrayList<String>();

        while (zis.getNextEntry() != null) { // Process each entry
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            IOHelper.copy(zis, bos);
            entries.add(bos.toString());
        }

        zis.close();

        return entries;
    }

}

--
View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4383828.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: FTP to HDFS - large gzipped files

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Oct 5, 2010 at 9:28 AM, coolgold <gr...@edmunds.com> wrote:
>
> Thank you Claus.  Couldn't gzip data format be used for unzipping?
> http://camel.apache.org/gzip-data-format.html
> --

Ah yeah forgot about that one. However I think it works in memory
only. However its java code so you can always look at the source code
and do like it in a regular java bean and unzip from file -> file
directly.

And we love contributions so we could most likely add a feature to the
data format so it can store output to a file directly.


> View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p3198933.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: FTP to HDFS - large gzipped files

Posted by coolgold <gr...@edmunds.com>.
Thank you Claus.  Couldn't gzip data format be used for unzipping? 
http://camel.apache.org/gzip-data-format.html
-- 
View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p3198933.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: FTP to HDFS - large gzipped files

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Oct 4, 2010 at 7:14 PM, coolgold <gr...@edmunds.com> wrote:
>
> I'm considering using camel and activemq for moving gzipped (~1gb) files form
> ftp server to hdfs. I'm aware of existence of ftp and hdfs components, but
> I'm not sure about the support for splitting for gzipped files or streaming
> method with FTP entry point.  What is the most straightforward/scalable
> approach?
> --

On the FTP endpoint you can configure a local work path which tells
Camel to stream the remote FTP file directly to a temporary file in
the work path.
Then you wont read the file into memory.

We dont have any Camel component for reading compressed files such as
gzip. The Apache Commons VFS project do have such features but the
project is dying/died. There may be other libraries out there which
makes it easy to work with compressed files. And we love
contributions.



> View this message in context: http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p3192431.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus