You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrea Cisternino <a....@gmail.com> on 2016/06/07 14:35:02 UTC

Reading whole files (from S3)

Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX
files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by
line basis.
One GPX file will produce one or more Java objects that will contain the
geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}

This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

-- 
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino

Re: Reading whole files (from S3)

Posted by Robert Metzger <rm...@apache.org>.
Hi,
setting the unsplittable attribute in the constructor is fine. The field's
value will be send to the cluster.
So what happens is that you initialize the input format in your client
program. Then, its serialized, send over the network to the machines and
deserilaized again. So the value you've set in the ctor will end up in the
cluster.

On Fri, Jun 10, 2016 at 10:53 AM, Andrea Cisternino <a....@gmail.com>
wrote:

> Hi,
>
> I am replying to myself for the records and to provide an update on what I
> am trying to do.
>
> I have looked into Mahout's XmlInputFormat class but unfortunately it
> doesn't solve my problem.
>
> My exploratory work with Flink tries to reproduce the key steps that we
> already perform in a quite large Apache Spark application that runs on
> Amazon EMR.
>
> For our use case the GPX files are not collections of independent records
> that could be split and analyzed in parallel. Instead, more than 95% of
> them are considered by our algorithms as a single record (a so called
> "Track").
>
> IOW, we would not gain anything by splitting the files because in the vast
> majority of the cases we would get only one slice out of one file defeating
> the purpose of splitting them in the first place.
>
> GPX files have also another nasty property: they come in two versions (1.0
> and 1.1, see more at http://www.topografix.com/gpx.asp.)
> Important attributes of a point (e.g. speed) are encoded very differently
> in the two versions and therefore the parsing logic must be different, at
> least for some sections of the file.
>
> To recognize the file version, the parser must look at the entire file
> because this information is available only in the namespace declaration of
> the root element.
>
> On top of all of this I think that, because of their small size and
> because we read all of them from S3, splitting within the file is not an
> issue. Can you confirm that?
>
> Going back to my WholeFileInputFormat class I am worried about setting
> the unsplittable attribute to true in the constructor. Will the
> constructor be invoked also when running in cluster?
>
> Well, I think i really need to setup a small Flink cluster and try it
> myself :)
>
> Thanks again.
>   Andrea.
>
> On 8 June 2016 at 08:16, Andrea Cisternino <a....@gmail.com> wrote:
>
>> Jamie, Suneel thanks a lot, your replies have been very helpful.
>>
>> I will definitely take a look at XMLInputFormat.
>>
>> In any case the files are not very big: on average 100-200kB up to a max
>> of a couple of MB.
>>
>>
>> On 8 June 2016 at 04:23, Suneel Marthi <sm...@apache.org> wrote:
>>
>>> You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat
>>> definitions. See
>>>
>>> <http://goog_121160879>
>>>
>>> http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink
>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html
>>>
>>>
>>> On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <ja...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Andrea,
>>>>
>>>> How large are these data files?  The implementation you've mentioned
>>>> here is only usable if they are very small.  If so, you're fine.  If not
>>>> read on...
>>>>
>>>> Processing XML input files in parallel is tricky.  It's not a great
>>>> format for this type of processing as you've seen.  They are tricky to
>>>> split and more complex to iterate through than simpler formats. However,
>>>> others have implemented XMLInputFormat classes for Hadoop.  Have you looked
>>>> at these?  Mahout has an XMLInputFormat implementation for example but I
>>>> haven't used it directly.
>>>>
>>>> Anyway, you can reuse Hadoop InputFormat implementations in Flink
>>>> directly.  This is likely a good route.  See Flink's HadoopInputFormat
>>>> class.
>>>>
>>>> -Jamie
>>>>
>>>>
>>>> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <
>>>> a.cisternino@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am evaluating Apache Flink for processing large sets of Geospatial
>>>>> data.
>>>>> The use case I am working on will involve reading a certain number of
>>>>> GPX files stored on Amazon S3.
>>>>>
>>>>> GPX files are actually XML files and therefore cannot be read on a
>>>>> line by line basis.
>>>>> One GPX file will produce one or more Java objects that will contain
>>>>> the geospatial data we need to process (mostly a list of geographical
>>>>> points).
>>>>>
>>>>> To cover this use case I tried to extend the FileInputFormat class:
>>>>>
>>>>> public class WholeFileInputFormat extends FileInputFormat<String>
>>>>> {
>>>>>   private boolean hasReachedEnd = false;
>>>>>
>>>>>   public WholeFileInputFormat() {
>>>>>     unsplittable = true;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public void open(FileInputSplit fileSplit) throws IOException {
>>>>>     super.open(fileSplit);
>>>>>     hasReachedEnd = false;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public String nextRecord(String reuse) throws IOException {
>>>>>     // uses apache.commons.io.IOUtils
>>>>>     String fileContent = IOUtils.toString(stream,
>>>>> StandardCharsets.UTF_8);
>>>>>     hasReachedEnd = true;
>>>>>     return fileContent;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public boolean reachedEnd() throws IOException {
>>>>>     return hasReachedEnd;
>>>>>   }
>>>>> }
>>>>>
>>>>> This class returns the content of the whole file as a string.
>>>>>
>>>>> Is this the right approach?
>>>>> It seems to work when run locally with local files but I wonder if it
>>>>> would
>>>>> run into problems when tested in a cluster.
>>>>>
>>>>> Thanks in advance.
>>>>>   Andrea.
>>>>>
>>>>> --
>>>>> Andrea Cisternino, Erlangen, Germany
>>>>> GitHub: http://github.com/acisternino
>>>>> GitLab: https://gitlab.com/u/acisternino
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jamie Grier
>>>> data Artisans, Director of Applications Engineering
>>>> @jamiegrier <https://twitter.com/jamiegrier>
>>>> jamie@data-artisans.com
>>>>
>>>>
>>>
>>
>>
>> --
>> Andrea Cisternino, Erlangen, Germany
>> LinkedIn: http://www.linkedin.com/in/andreacisternino
>> GitHub: http://github.com/acisternino
>>
>
>
>
> --
> Andrea Cisternino, Erlangen, Germany
> LinkedIn: http://www.linkedin.com/in/andreacisternino
> GitHub: http://github.com/acisternino
>

Re: Reading whole files (from S3)

Posted by Andrea Cisternino <a....@gmail.com>.
Hi,

I am replying to myself for the records and to provide an update on what I
am trying to do.

I have looked into Mahout's XmlInputFormat class but unfortunately it
doesn't solve my problem.

My exploratory work with Flink tries to reproduce the key steps that we
already perform in a quite large Apache Spark application that runs on
Amazon EMR.

For our use case the GPX files are not collections of independent records
that could be split and analyzed in parallel. Instead, more than 95% of
them are considered by our algorithms as a single record (a so called
"Track").

IOW, we would not gain anything by splitting the files because in the vast
majority of the cases we would get only one slice out of one file defeating
the purpose of splitting them in the first place.

GPX files have also another nasty property: they come in two versions (1.0
and 1.1, see more at http://www.topografix.com/gpx.asp.)
Important attributes of a point (e.g. speed) are encoded very differently
in the two versions and therefore the parsing logic must be different, at
least for some sections of the file.

To recognize the file version, the parser must look at the entire file
because this information is available only in the namespace declaration of
the root element.

On top of all of this I think that, because of their small size and because
we read all of them from S3, splitting within the file is not an issue. Can
you confirm that?

Going back to my WholeFileInputFormat class I am worried about setting the
unsplittable attribute to true in the constructor. Will the constructor be
invoked also when running in cluster?

Well, I think i really need to setup a small Flink cluster and try it
myself :)

Thanks again.
  Andrea.

On 8 June 2016 at 08:16, Andrea Cisternino <a....@gmail.com> wrote:

> Jamie, Suneel thanks a lot, your replies have been very helpful.
>
> I will definitely take a look at XMLInputFormat.
>
> In any case the files are not very big: on average 100-200kB up to a max
> of a couple of MB.
>
>
> On 8 June 2016 at 04:23, Suneel Marthi <sm...@apache.org> wrote:
>
>> You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat
>> definitions. See
>>
>> <http://goog_121160879>
>>
>> http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html
>>
>>
>> On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <ja...@data-artisans.com>
>> wrote:
>>
>>> Hi Andrea,
>>>
>>> How large are these data files?  The implementation you've mentioned
>>> here is only usable if they are very small.  If so, you're fine.  If not
>>> read on...
>>>
>>> Processing XML input files in parallel is tricky.  It's not a great
>>> format for this type of processing as you've seen.  They are tricky to
>>> split and more complex to iterate through than simpler formats. However,
>>> others have implemented XMLInputFormat classes for Hadoop.  Have you looked
>>> at these?  Mahout has an XMLInputFormat implementation for example but I
>>> haven't used it directly.
>>>
>>> Anyway, you can reuse Hadoop InputFormat implementations in Flink
>>> directly.  This is likely a good route.  See Flink's HadoopInputFormat
>>> class.
>>>
>>> -Jamie
>>>
>>>
>>> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <
>>> a.cisternino@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am evaluating Apache Flink for processing large sets of Geospatial
>>>> data.
>>>> The use case I am working on will involve reading a certain number of
>>>> GPX files stored on Amazon S3.
>>>>
>>>> GPX files are actually XML files and therefore cannot be read on a line
>>>> by line basis.
>>>> One GPX file will produce one or more Java objects that will contain
>>>> the geospatial data we need to process (mostly a list of geographical
>>>> points).
>>>>
>>>> To cover this use case I tried to extend the FileInputFormat class:
>>>>
>>>> public class WholeFileInputFormat extends FileInputFormat<String>
>>>> {
>>>>   private boolean hasReachedEnd = false;
>>>>
>>>>   public WholeFileInputFormat() {
>>>>     unsplittable = true;
>>>>   }
>>>>
>>>>   @Override
>>>>   public void open(FileInputSplit fileSplit) throws IOException {
>>>>     super.open(fileSplit);
>>>>     hasReachedEnd = false;
>>>>   }
>>>>
>>>>   @Override
>>>>   public String nextRecord(String reuse) throws IOException {
>>>>     // uses apache.commons.io.IOUtils
>>>>     String fileContent = IOUtils.toString(stream,
>>>> StandardCharsets.UTF_8);
>>>>     hasReachedEnd = true;
>>>>     return fileContent;
>>>>   }
>>>>
>>>>   @Override
>>>>   public boolean reachedEnd() throws IOException {
>>>>     return hasReachedEnd;
>>>>   }
>>>> }
>>>>
>>>> This class returns the content of the whole file as a string.
>>>>
>>>> Is this the right approach?
>>>> It seems to work when run locally with local files but I wonder if it
>>>> would
>>>> run into problems when tested in a cluster.
>>>>
>>>> Thanks in advance.
>>>>   Andrea.
>>>>
>>>> --
>>>> Andrea Cisternino, Erlangen, Germany
>>>> GitHub: http://github.com/acisternino
>>>> GitLab: https://gitlab.com/u/acisternino
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier <https://twitter.com/jamiegrier>
>>> jamie@data-artisans.com
>>>
>>>
>>
>
>
> --
> Andrea Cisternino, Erlangen, Germany
> LinkedIn: http://www.linkedin.com/in/andreacisternino
> GitHub: http://github.com/acisternino
>



-- 
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino

Re: Reading whole files (from S3)

Posted by Andrea Cisternino <a....@gmail.com>.
Jamie, Suneel thanks a lot, your replies have been very helpful.

I will definitely take a look at XMLInputFormat.

In any case the files are not very big: on average 100-200kB up to a max of
a couple of MB.


On 8 June 2016 at 04:23, Suneel Marthi <sm...@apache.org> wrote:

> You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat
> definitions. See
>
> <http://goog_121160879>
> http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html
>
>
> On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> Hi Andrea,
>>
>> How large are these data files?  The implementation you've mentioned here
>> is only usable if they are very small.  If so, you're fine.  If not read
>> on...
>>
>> Processing XML input files in parallel is tricky.  It's not a great
>> format for this type of processing as you've seen.  They are tricky to
>> split and more complex to iterate through than simpler formats. However,
>> others have implemented XMLInputFormat classes for Hadoop.  Have you looked
>> at these?  Mahout has an XMLInputFormat implementation for example but I
>> haven't used it directly.
>>
>> Anyway, you can reuse Hadoop InputFormat implementations in Flink
>> directly.  This is likely a good route.  See Flink's HadoopInputFormat
>> class.
>>
>> -Jamie
>>
>>
>> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <a.cisternino@gmail.com
>> > wrote:
>>
>>> Hi all,
>>>
>>> I am evaluating Apache Flink for processing large sets of Geospatial
>>> data.
>>> The use case I am working on will involve reading a certain number of
>>> GPX files stored on Amazon S3.
>>>
>>> GPX files are actually XML files and therefore cannot be read on a line
>>> by line basis.
>>> One GPX file will produce one or more Java objects that will contain the
>>> geospatial data we need to process (mostly a list of geographical points).
>>>
>>> To cover this use case I tried to extend the FileInputFormat class:
>>>
>>> public class WholeFileInputFormat extends FileInputFormat<String>
>>> {
>>>   private boolean hasReachedEnd = false;
>>>
>>>   public WholeFileInputFormat() {
>>>     unsplittable = true;
>>>   }
>>>
>>>   @Override
>>>   public void open(FileInputSplit fileSplit) throws IOException {
>>>     super.open(fileSplit);
>>>     hasReachedEnd = false;
>>>   }
>>>
>>>   @Override
>>>   public String nextRecord(String reuse) throws IOException {
>>>     // uses apache.commons.io.IOUtils
>>>     String fileContent = IOUtils.toString(stream,
>>> StandardCharsets.UTF_8);
>>>     hasReachedEnd = true;
>>>     return fileContent;
>>>   }
>>>
>>>   @Override
>>>   public boolean reachedEnd() throws IOException {
>>>     return hasReachedEnd;
>>>   }
>>> }
>>>
>>> This class returns the content of the whole file as a string.
>>>
>>> Is this the right approach?
>>> It seems to work when run locally with local files but I wonder if it
>>> would
>>> run into problems when tested in a cluster.
>>>
>>> Thanks in advance.
>>>   Andrea.
>>>
>>> --
>>> Andrea Cisternino, Erlangen, Germany
>>> GitHub: http://github.com/acisternino
>>> GitLab: https://gitlab.com/u/acisternino
>>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>
>


-- 
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino

Re: Reading whole files (from S3)

Posted by Suneel Marthi <sm...@apache.org>.
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat
definitions. See

<goog_121160879>
http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> Hi Andrea,
>
> How large are these data files?  The implementation you've mentioned here
> is only usable if they are very small.  If so, you're fine.  If not read
> on...
>
> Processing XML input files in parallel is tricky.  It's not a great format
> for this type of processing as you've seen.  They are tricky to split and
> more complex to iterate through than simpler formats. However, others have
> implemented XMLInputFormat classes for Hadoop.  Have you looked at these?
> Mahout has an XMLInputFormat implementation for example but I haven't used
> it directly.
>
> Anyway, you can reuse Hadoop InputFormat implementations in Flink
> directly.  This is likely a good route.  See Flink's HadoopInputFormat
> class.
>
> -Jamie
>
>
> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <a....@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am evaluating Apache Flink for processing large sets of Geospatial data.
>> The use case I am working on will involve reading a certain number of GPX
>> files stored on Amazon S3.
>>
>> GPX files are actually XML files and therefore cannot be read on a line
>> by line basis.
>> One GPX file will produce one or more Java objects that will contain the
>> geospatial data we need to process (mostly a list of geographical points).
>>
>> To cover this use case I tried to extend the FileInputFormat class:
>>
>> public class WholeFileInputFormat extends FileInputFormat<String>
>> {
>>   private boolean hasReachedEnd = false;
>>
>>   public WholeFileInputFormat() {
>>     unsplittable = true;
>>   }
>>
>>   @Override
>>   public void open(FileInputSplit fileSplit) throws IOException {
>>     super.open(fileSplit);
>>     hasReachedEnd = false;
>>   }
>>
>>   @Override
>>   public String nextRecord(String reuse) throws IOException {
>>     // uses apache.commons.io.IOUtils
>>     String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
>>     hasReachedEnd = true;
>>     return fileContent;
>>   }
>>
>>   @Override
>>   public boolean reachedEnd() throws IOException {
>>     return hasReachedEnd;
>>   }
>> }
>>
>> This class returns the content of the whole file as a string.
>>
>> Is this the right approach?
>> It seems to work when run locally with local files but I wonder if it
>> would
>> run into problems when tested in a cluster.
>>
>> Thanks in advance.
>>   Andrea.
>>
>> --
>> Andrea Cisternino, Erlangen, Germany
>> GitHub: http://github.com/acisternino
>> GitLab: https://gitlab.com/u/acisternino
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Re: Reading whole files (from S3)

Posted by Jamie Grier <ja...@data-artisans.com>.
Hi Andrea,

How large are these data files?  The implementation you've mentioned here
is only usable if they are very small.  If so, you're fine.  If not read
on...

Processing XML input files in parallel is tricky.  It's not a great format
for this type of processing as you've seen.  They are tricky to split and
more complex to iterate through than simpler formats. However, others have
implemented XMLInputFormat classes for Hadoop.  Have you looked at these?
Mahout has an XMLInputFormat implementation for example but I haven't used
it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink
directly.  This is likely a good route.  See Flink's HadoopInputFormat
class.

-Jamie


On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <a....@gmail.com>
wrote:

> Hi all,
>
> I am evaluating Apache Flink for processing large sets of Geospatial data.
> The use case I am working on will involve reading a certain number of GPX
> files stored on Amazon S3.
>
> GPX files are actually XML files and therefore cannot be read on a line by
> line basis.
> One GPX file will produce one or more Java objects that will contain the
> geospatial data we need to process (mostly a list of geographical points).
>
> To cover this use case I tried to extend the FileInputFormat class:
>
> public class WholeFileInputFormat extends FileInputFormat<String>
> {
>   private boolean hasReachedEnd = false;
>
>   public WholeFileInputFormat() {
>     unsplittable = true;
>   }
>
>   @Override
>   public void open(FileInputSplit fileSplit) throws IOException {
>     super.open(fileSplit);
>     hasReachedEnd = false;
>   }
>
>   @Override
>   public String nextRecord(String reuse) throws IOException {
>     // uses apache.commons.io.IOUtils
>     String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
>     hasReachedEnd = true;
>     return fileContent;
>   }
>
>   @Override
>   public boolean reachedEnd() throws IOException {
>     return hasReachedEnd;
>   }
> }
>
> This class returns the content of the whole file as a string.
>
> Is this the right approach?
> It seems to work when run locally with local files but I wonder if it would
> run into problems when tested in a cluster.
>
> Thanks in advance.
>   Andrea.
>
> --
> Andrea Cisternino, Erlangen, Germany
> GitHub: http://github.com/acisternino
> GitLab: https://gitlab.com/u/acisternino
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com