You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Amol Kekre <am...@datatorrent.com> on 2015/09/28 20:10:45 UTC

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Routing to dev@apex

Amol


On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:

> Hi Pramod,
>
> thanks for the reply, it is working..
>
> And i have one more query on it, How to decide the block size?
>
> as per my understanding the
>
> noofBlocks=filesize / blocksize
>
> By this some records may be split into two blocks, when converting the
> record we dont have the complete data in one block.
>
> how to handle this?
>
> thanks in adavance.
>
> Thanks -Chiru
>
>
> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>
>> Hi All,
>>
>> I would like to read a large file using filesplitter and emit tuples.So i
>> have writtent the code like below.
>>
>>
>> public class Reader extends
>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>
>> @Override
>> protected Data convertToRecord(byte[] data)  { ///
>> }
>>
>> }
>>
>>
>> In my application class  i have created the object for filesplitter and
>> Reader classes and connect through stream.
>>
>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>> reader.blocksMetadataInput)
>>
>>
>> In properties file iam passing the directory path
>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>
>> when i run the application iam getting the below error:
>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>> engine.StreamingContainer run - Abandoning deployment of operator
>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>> 0,
>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>> due to setup failure.*
>> *java.lang.IllegalArgumentException: empty files*
>>
>>
>> Please suggest is my approach is correct or not?
>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>> share any sample code line.
>>
>> thanks _Chiranjeevi
>>
> --
> You received this message because you are subscribed to the Google Groups
> "Malhar" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to malhar-users+unsubscribe@googlegroups.com.
> To post to this group, send email to malhar-users@googlegroups.com.
> Visit this group at http://groups.google.com/group/malhar-users.
> For more options, visit https://groups.google.com/d/optout.
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Thanks Chandni

On Wed, Oct 28, 2015 at 11:31 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Folks,
>
> You can comment on these pull requests:
> https://github.com/DataTorrent/docs/pull/2
> https://github.com/DataTorrent/docs/pull/3
>
> Chandni
>
>
> On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > HI All,
> >
> > We have created tutorials for FileSplitter and BlockReader here:
> >
> >
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
> >
> >
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md
> >
> > Please have a look. Any feedback is appreciated.
> >
> > Thanks,
> > Chandni
> >
> > On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> >> Hi Vk,
> >>
> >> Please find  a CSV block reader here and let me know if you have
> >> questions. I have also added a test and it seems to be working fine.
> >>
> >>
> >>
> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
> >>
> >> Please note that the BlockReader api has changed from the one you have
> >> been using considerably.
> >>
> >> Thanks,
> >> Chandni
> >>
> >> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <chandni@datatorrent.com
> >
> >> wrote:
> >>
> >>> Hi vk,
> >>>
> >>> I think you don't need to override readBlock() in AbstractBlockReader.
> >>>
> >>> A simpler way to do this will be using ReadAheadLineReaderContext as
> the
> >>> readerContext and providing implementation of converting bytes to the
> CSV
> >>> bean.
> >>>
> >>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
> >>> {
> >>>   public CSVBeanReader()
> >>>   {
> >>>     this.readerContext = new
> ReaderContext.ReadAheadLineReaderContext<>();
> >>>   }
> >>>
> >>>   @Override protected CSVBean convertToRecord(byte[] bytes)
> >>>   {
> >>>     //TODO: convert bytes to bean
> >>>     return new CSVBean(bytes);
> >>>   }
> >>> }
> >>>
> >>> Are you using supercsv? I think there is a way to convert bytes to a
> CSV
> >>> Record using it and I may have that example somewhere which I will
> look up
> >>> and let you know.
> >>>
> >>> Chandni
> >>>
> >>>
> >>>
> >>> On Tue, Sep 29, 2015 at 2:06 PM, vk <ve...@gmail.com>
> >>> wrote:
> >>>
> >>>>  Here is a detailed description of the problem.
> >>>>
> >>>>>
> >>>>> My file size : *7,590,177  bytes*
> >>>>>
> >>>>> FIle splitter block size config :
> >>>>>
> >>>>>
> >>>>>
> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
> >>>>>  <value>*16806*</value>
> >>>>>
> >>>>>
> >>>>> *MyBlockReader Implementation:*
> >>>>>
> >>>>> @Override
> >>>>> protected void readBlock(BlockMetadata blockMetadata) throws
> >>>>> IOException {
> >>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
> >>>>> ReaderContext.Entity entity;
> >>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
> >>>>> encoding),csvPreference);
> >>>>> while ((entity = readerContext.next()) != null) {
> >>>>>
> >>>>>
> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
> >>>>> Packages record = convertToRecord(entity.getRecord());
> >>>>> //if record is partial, ignore the record
> >>>>> if (record != null) {
> >>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
> >>>>> data.emit(record);
> >>>>> }
> >>>>> }
> >>>>> }
> >>>>>
> >>>>>
> >>>>> @Override
> >>>>> protected Packages convertToRecord(byte[] data) {
> >>>>> Packages bean = null;
> >>>>> try {
> >>>>> bean = csvReader.read(Packages.class,Packages.COLS);
> >>>>> } catch (IOException e) {
> >>>>> e.printStackTrace();
> >>>>> }
> >>>>> return bean;
> >>>>> }
> >>>>>
> >>>>>
> >>>>> Based on the above, when blocks are created a record might be split
> >>>>> into two different blocks. When reading the blocks and converting
> them to
> >>>>> beans, it has to set the offset values appropriately to merge the
> split
> >>>>> record into one and process it. It looks like this implementation
> >>>>> is already handled in the API when *readerContext.initialize(stream,
> >>>>> blockMetadata, consecutiveBlock)* is called, but when tried to
> >>>>> execute with the above snippet, the following error is thrown
> because of
> >>>>> the split record. Can you please suggest?
> >>>>>
> >>>>> *Exception:*
> >>>>>
> >>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
> >>>>> engine.StreamingContainer run - Operator set
> >>>>>
> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
> >>>>> 0,
> >>>>>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
> >>>>> stopped running due to an exception.
> >>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
> >>>>> number of columns read should be the same size (nameMapping length =
> 24,
> >>>>> columns = 5)*
> >>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
> >>>>> at
> >>>>>
> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
> >>>>> at
> >>>>>
> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
> >>>>>
> >>>>>
> >>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
> >>>>>
> >>>>>
> >>>>> Routing to dev@apex
> >>>>>
> >>>>> Amol
> >>>>>
> >>>>>
> >>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Pramod,
> >>>>>>
> >>>>>> thanks for the reply, it is working..
> >>>>>>
> >>>>>> And i have one more query on it, How to decide the block size?
> >>>>>>
> >>>>>> as per my understanding the
> >>>>>>
> >>>>>> noofBlocks=filesize / blocksize
> >>>>>>
> >>>>>> By this some records may be split into two blocks, when converting
> >>>>>> the record we dont have the complete data in one block.
> >>>>>>
> >>>>>> how to handle this?
> >>>>>>
> >>>>>> thanks in adavance.
> >>>>>>
> >>>>>> Thanks -Chiru
> >>>>>>
> >>>>>>
> >>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
> >>>>>>>
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> I would like to read a large file using filesplitter and emit
> >>>>>>> tuples.So i have writtent the code like below.
> >>>>>>>
> >>>>>>>
> >>>>>>> public class Reader extends
> >>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> protected Data convertToRecord(byte[] data)  { ///
> >>>>>>> }
> >>>>>>>
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> In my application class  i have created the object for filesplitter
> >>>>>>> and Reader classes and connect through stream.
> >>>>>>>
> >>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
> >>>>>>> reader.blocksMetadataInput)
> >>>>>>>
> >>>>>>>
> >>>>>>> In properties file iam passing the directory path
> >>>>>>> like
> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
> >>>>>>>
> >>>>>>> when i run the application iam getting the below error:
> >>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
> >>>>>>> engine.StreamingContainer run - Abandoning deployment of operator
> >>>>>>>
> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
> >>>>>>> 0,
> >>>>>>>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
> >>>>>>> due to setup failure.*
> >>>>>>> *java.lang.IllegalArgumentException: empty files*
> >>>>>>>
> >>>>>>>
> >>>>>>> Please suggest is my approach is correct or not?
> >>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar
> ,
> >>>>>>> share any sample code line.
> >>>>>>>
> >>>>>>> thanks _Chiranjeevi
> >>>>>>>
> >>>>>> --
> >>>>>> You received this message because you are subscribed to the Google
> >>>>>> Groups "Malhar" group.
> >>>>>> To unsubscribe from this group and stop receiving emails from it,
> >>>>>> send an email to malhar-users...@googlegroups.com.
> >>>>>> To post to this group, send email to malhar...@googlegroups.com.
> >>>>>> Visit this group at http://groups.google.com/group/malhar-users.
> >>>>>> For more options, visit https://groups.google.com/d/optout.
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Chandni Singh <ch...@datatorrent.com>.
Folks,

You can comment on these pull requests:
https://github.com/DataTorrent/docs/pull/2
https://github.com/DataTorrent/docs/pull/3

Chandni


On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> HI All,
>
> We have created tutorials for FileSplitter and BlockReader here:
>
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
>
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md
>
> Please have a look. Any feedback is appreciated.
>
> Thanks,
> Chandni
>
> On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
>> Hi Vk,
>>
>> Please find  a CSV block reader here and let me know if you have
>> questions. I have also added a test and it seems to be working fine.
>>
>>
>> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
>>
>> Please note that the BlockReader api has changed from the one you have
>> been using considerably.
>>
>> Thanks,
>> Chandni
>>
>> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <ch...@datatorrent.com>
>> wrote:
>>
>>> Hi vk,
>>>
>>> I think you don't need to override readBlock() in AbstractBlockReader.
>>>
>>> A simpler way to do this will be using ReadAheadLineReaderContext as the
>>> readerContext and providing implementation of converting bytes to the CSV
>>> bean.
>>>
>>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
>>> {
>>>   public CSVBeanReader()
>>>   {
>>>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>>>   }
>>>
>>>   @Override protected CSVBean convertToRecord(byte[] bytes)
>>>   {
>>>     //TODO: convert bytes to bean
>>>     return new CSVBean(bytes);
>>>   }
>>> }
>>>
>>> Are you using supercsv? I think there is a way to convert bytes to a CSV
>>> Record using it and I may have that example somewhere which I will look up
>>> and let you know.
>>>
>>> Chandni
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 2:06 PM, vk <ve...@gmail.com>
>>> wrote:
>>>
>>>>  Here is a detailed description of the problem.
>>>>
>>>>>
>>>>> My file size : *7,590,177  bytes*
>>>>>
>>>>> FIle splitter block size config :
>>>>>
>>>>>
>>>>>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>>>  <value>*16806*</value>
>>>>>
>>>>>
>>>>> *MyBlockReader Implementation:*
>>>>>
>>>>> @Override
>>>>> protected void readBlock(BlockMetadata blockMetadata) throws
>>>>> IOException {
>>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>>>> ReaderContext.Entity entity;
>>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>>>> encoding),csvPreference);
>>>>> while ((entity = readerContext.next()) != null) {
>>>>>
>>>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>>>> Packages record = convertToRecord(entity.getRecord());
>>>>> //if record is partial, ignore the record
>>>>> if (record != null) {
>>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>>>> data.emit(record);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> @Override
>>>>> protected Packages convertToRecord(byte[] data) {
>>>>> Packages bean = null;
>>>>> try {
>>>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>>>> } catch (IOException e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> return bean;
>>>>> }
>>>>>
>>>>>
>>>>> Based on the above, when blocks are created a record might be split
>>>>> into two different blocks. When reading the blocks and converting them to
>>>>> beans, it has to set the offset values appropriately to merge the split
>>>>> record into one and process it. It looks like this implementation
>>>>> is already handled in the API when *readerContext.initialize(stream,
>>>>> blockMetadata, consecutiveBlock)* is called, but when tried to
>>>>> execute with the above snippet, the following error is thrown because of
>>>>> the split record. Can you please suggest?
>>>>>
>>>>> *Exception:*
>>>>>
>>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>>>> engine.StreamingContainer run - Operator set
>>>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>>>> 0,
>>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>>>> stopped running due to an exception.
>>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>>>> number of columns read should be the same size (nameMapping length = 24,
>>>>> columns = 5)*
>>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>>>> at
>>>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>>>> at
>>>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>>>
>>>>>
>>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>>>
>>>>>
>>>>> Routing to dev@apex
>>>>>
>>>>> Amol
>>>>>
>>>>>
>>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:
>>>>>
>>>>>> Hi Pramod,
>>>>>>
>>>>>> thanks for the reply, it is working..
>>>>>>
>>>>>> And i have one more query on it, How to decide the block size?
>>>>>>
>>>>>> as per my understanding the
>>>>>>
>>>>>> noofBlocks=filesize / blocksize
>>>>>>
>>>>>> By this some records may be split into two blocks, when converting
>>>>>> the record we dont have the complete data in one block.
>>>>>>
>>>>>> how to handle this?
>>>>>>
>>>>>> thanks in adavance.
>>>>>>
>>>>>> Thanks -Chiru
>>>>>>
>>>>>>
>>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I would like to read a large file using filesplitter and emit
>>>>>>> tuples.So i have writtent the code like below.
>>>>>>>
>>>>>>>
>>>>>>> public class Reader extends
>>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>>>
>>>>>>> @Override
>>>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> In my application class  i have created the object for filesplitter
>>>>>>> and Reader classes and connect through stream.
>>>>>>>
>>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>>>> reader.blocksMetadataInput)
>>>>>>>
>>>>>>>
>>>>>>> In properties file iam passing the directory path
>>>>>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>>>
>>>>>>> when i run the application iam getting the below error:
>>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>>>> 0,
>>>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>>>> due to setup failure.*
>>>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>>>
>>>>>>>
>>>>>>> Please suggest is my approach is correct or not?
>>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>>>> share any sample code line.
>>>>>>>
>>>>>>> thanks _Chiranjeevi
>>>>>>>
>>>>>> --
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Malhar" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to malhar-users...@googlegroups.com.
>>>>>> To post to this group, send email to malhar...@googlegroups.com.
>>>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>
>>
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Chandni Singh <ch...@datatorrent.com>.
HI All,

We have created tutorials for FileSplitter and BlockReader here:
https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md

Please have a look. Any feedback is appreciated.

Thanks,
Chandni

On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Hi Vk,
>
> Please find  a CSV block reader here and let me know if you have
> questions. I have also added a test and it seems to be working fine.
>
>
> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
>
> Please note that the BlockReader api has changed from the one you have
> been using considerably.
>
> Thanks,
> Chandni
>
> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
>> Hi vk,
>>
>> I think you don't need to override readBlock() in AbstractBlockReader.
>>
>> A simpler way to do this will be using ReadAheadLineReaderContext as the
>> readerContext and providing implementation of converting bytes to the CSV
>> bean.
>>
>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
>> {
>>   public CSVBeanReader()
>>   {
>>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>>   }
>>
>>   @Override protected CSVBean convertToRecord(byte[] bytes)
>>   {
>>     //TODO: convert bytes to bean
>>     return new CSVBean(bytes);
>>   }
>> }
>>
>> Are you using supercsv? I think there is a way to convert bytes to a CSV
>> Record using it and I may have that example somewhere which I will look up
>> and let you know.
>>
>> Chandni
>>
>>
>>
>> On Tue, Sep 29, 2015 at 2:06 PM, vk <ve...@gmail.com>
>> wrote:
>>
>>>  Here is a detailed description of the problem.
>>>
>>>>
>>>> My file size : *7,590,177  bytes*
>>>>
>>>> FIle splitter block size config :
>>>>
>>>>
>>>>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>>  <value>*16806*</value>
>>>>
>>>>
>>>> *MyBlockReader Implementation:*
>>>>
>>>> @Override
>>>> protected void readBlock(BlockMetadata blockMetadata) throws
>>>> IOException {
>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>>> ReaderContext.Entity entity;
>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>>> encoding),csvPreference);
>>>> while ((entity = readerContext.next()) != null) {
>>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>>> Packages record = convertToRecord(entity.getRecord());
>>>> //if record is partial, ignore the record
>>>> if (record != null) {
>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>>> data.emit(record);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> @Override
>>>> protected Packages convertToRecord(byte[] data) {
>>>> Packages bean = null;
>>>> try {
>>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>>> } catch (IOException e) {
>>>> e.printStackTrace();
>>>> }
>>>> return bean;
>>>> }
>>>>
>>>>
>>>> Based on the above, when blocks are created a record might be split
>>>> into two different blocks. When reading the blocks and converting them to
>>>> beans, it has to set the offset values appropriately to merge the split
>>>> record into one and process it. It looks like this implementation
>>>> is already handled in the API when *readerContext.initialize(stream,
>>>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>>>> with the above snippet, the following error is thrown because of the split
>>>> record. Can you please suggest?
>>>>
>>>> *Exception:*
>>>>
>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>>> engine.StreamingContainer run - Operator set
>>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>>> 0,
>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>>> stopped running due to an exception.
>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>>> number of columns read should be the same size (nameMapping length = 24,
>>>> columns = 5)*
>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>>> at
>>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>>> at
>>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>>
>>>>
>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>>
>>>>
>>>> Routing to dev@apex
>>>>
>>>> Amol
>>>>
>>>>
>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:
>>>>
>>>>> Hi Pramod,
>>>>>
>>>>> thanks for the reply, it is working..
>>>>>
>>>>> And i have one more query on it, How to decide the block size?
>>>>>
>>>>> as per my understanding the
>>>>>
>>>>> noofBlocks=filesize / blocksize
>>>>>
>>>>> By this some records may be split into two blocks, when converting the
>>>>> record we dont have the complete data in one block.
>>>>>
>>>>> how to handle this?
>>>>>
>>>>> thanks in adavance.
>>>>>
>>>>> Thanks -Chiru
>>>>>
>>>>>
>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I would like to read a large file using filesplitter and emit
>>>>>> tuples.So i have writtent the code like below.
>>>>>>
>>>>>>
>>>>>> public class Reader extends
>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>>
>>>>>> @Override
>>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> In my application class  i have created the object for filesplitter
>>>>>> and Reader classes and connect through stream.
>>>>>>
>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>>> reader.blocksMetadataInput)
>>>>>>
>>>>>>
>>>>>> In properties file iam passing the directory path
>>>>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>>
>>>>>> when i run the application iam getting the below error:
>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>>> 0,
>>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>>> due to setup failure.*
>>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>>
>>>>>>
>>>>>> Please suggest is my approach is correct or not?
>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>>> share any sample code line.
>>>>>>
>>>>>> thanks _Chiranjeevi
>>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Malhar" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to malhar-users...@googlegroups.com.
>>>>> To post to this group, send email to malhar...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Chandni Singh <ch...@datatorrent.com>.
Hi Vk,

Please find  a CSV block reader here and let me know if you have questions.
I have also added a test and it seems to be working fine.

https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader

Please note that the BlockReader api has changed from the one you have been
using considerably.

Thanks,
Chandni

On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Hi vk,
>
> I think you don't need to override readBlock() in AbstractBlockReader.
>
> A simpler way to do this will be using ReadAheadLineReaderContext as the
> readerContext and providing implementation of converting bytes to the CSV
> bean.
>
> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
> {
>   public CSVBeanReader()
>   {
>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>   }
>
>   @Override protected CSVBean convertToRecord(byte[] bytes)
>   {
>     //TODO: convert bytes to bean
>     return new CSVBean(bytes);
>   }
> }
>
> Are you using supercsv? I think there is a way to convert bytes to a CSV
> Record using it and I may have that example somewhere which I will look up
> and let you know.
>
> Chandni
>
>
>
> On Tue, Sep 29, 2015 at 2:06 PM, vk <ve...@gmail.com>
> wrote:
>
>>  Here is a detailed description of the problem.
>>
>>>
>>> My file size : *7,590,177  bytes*
>>>
>>> FIle splitter block size config :
>>>
>>>
>>>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>  <value>*16806*</value>
>>>
>>>
>>> *MyBlockReader Implementation:*
>>>
>>> @Override
>>> protected void readBlock(BlockMetadata blockMetadata) throws IOException
>>> {
>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>> ReaderContext.Entity entity;
>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>> encoding),csvPreference);
>>> while ((entity = readerContext.next()) != null) {
>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>> Packages record = convertToRecord(entity.getRecord());
>>> //if record is partial, ignore the record
>>> if (record != null) {
>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>> data.emit(record);
>>> }
>>> }
>>> }
>>>
>>>
>>> @Override
>>> protected Packages convertToRecord(byte[] data) {
>>> Packages bean = null;
>>> try {
>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>> } catch (IOException e) {
>>> e.printStackTrace();
>>> }
>>> return bean;
>>> }
>>>
>>>
>>> Based on the above, when blocks are created a record might be split into
>>> two different blocks. When reading the blocks and converting them to beans,
>>> it has to set the offset values appropriately to merge the split record
>>> into one and process it. It looks like this implementation
>>> is already handled in the API when *readerContext.initialize(stream,
>>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>>> with the above snippet, the following error is thrown because of the split
>>> record. Can you please suggest?
>>>
>>> *Exception:*
>>>
>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>> engine.StreamingContainer run - Operator set
>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>> 0,
>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>> stopped running due to an exception.
>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>> number of columns read should be the same size (nameMapping length = 24,
>>> columns = 5)*
>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>> at
>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>> at
>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>
>>>
>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>
>>>
>>> Routing to dev@apex
>>>
>>> Amol
>>>
>>>
>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:
>>>
>>>> Hi Pramod,
>>>>
>>>> thanks for the reply, it is working..
>>>>
>>>> And i have one more query on it, How to decide the block size?
>>>>
>>>> as per my understanding the
>>>>
>>>> noofBlocks=filesize / blocksize
>>>>
>>>> By this some records may be split into two blocks, when converting the
>>>> record we dont have the complete data in one block.
>>>>
>>>> how to handle this?
>>>>
>>>> thanks in adavance.
>>>>
>>>> Thanks -Chiru
>>>>
>>>>
>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> I would like to read a large file using filesplitter and emit
>>>>> tuples.So i have writtent the code like below.
>>>>>
>>>>>
>>>>> public class Reader extends
>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>
>>>>> @Override
>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> In my application class  i have created the object for filesplitter
>>>>> and Reader classes and connect through stream.
>>>>>
>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>> reader.blocksMetadataInput)
>>>>>
>>>>>
>>>>> In properties file iam passing the directory path
>>>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>
>>>>> when i run the application iam getting the below error:
>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>> 0,
>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>> due to setup failure.*
>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>
>>>>>
>>>>> Please suggest is my approach is correct or not?
>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>> share any sample code line.
>>>>>
>>>>> thanks _Chiranjeevi
>>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Malhar" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to malhar-users...@googlegroups.com.
>>>> To post to this group, send email to malhar...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Chandni Singh <ch...@datatorrent.com>.
Hi vk,

I think you don't need to override readBlock() in AbstractBlockReader.

A simpler way to do this will be using ReadAheadLineReaderContext as the
readerContext and providing implementation of converting bytes to the CSV
bean.

public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
{
  public CSVBeanReader()
  {
    this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
  }

  @Override protected CSVBean convertToRecord(byte[] bytes)
  {
    //TODO: convert bytes to bean
    return new CSVBean(bytes);
  }
}

Are you using supercsv? I think there is a way to convert bytes to a CSV
Record using it and I may have that example somewhere which I will look up
and let you know.

Chandni



On Tue, Sep 29, 2015 at 2:06 PM, vk <ve...@gmail.com> wrote:

>  Here is a detailed description of the problem.
>
>>
>> My file size : *7,590,177  bytes*
>>
>> FIle splitter block size config :
>>
>>
>>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>  <value>*16806*</value>
>>
>>
>> *MyBlockReader Implementation:*
>>
>> @Override
>> protected void readBlock(BlockMetadata blockMetadata) throws IOException {
>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>> ReaderContext.Entity entity;
>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>> encoding),csvPreference);
>> while ((entity = readerContext.next()) != null) {
>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>> Packages record = convertToRecord(entity.getRecord());
>> //if record is partial, ignore the record
>> if (record != null) {
>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>> data.emit(record);
>> }
>> }
>> }
>>
>>
>> @Override
>> protected Packages convertToRecord(byte[] data) {
>> Packages bean = null;
>> try {
>> bean = csvReader.read(Packages.class,Packages.COLS);
>> } catch (IOException e) {
>> e.printStackTrace();
>> }
>> return bean;
>> }
>>
>>
>> Based on the above, when blocks are created a record might be split into
>> two different blocks. When reading the blocks and converting them to beans,
>> it has to set the offset values appropriately to merge the split record
>> into one and process it. It looks like this implementation
>> is already handled in the API when *readerContext.initialize(stream,
>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>> with the above snippet, the following error is thrown because of the split
>> record. Can you please suggest?
>>
>> *Exception:*
>>
>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>> engine.StreamingContainer run - Operator set
>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>> 0,
>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>> stopped running due to an exception.
>> *java.lang.IllegalArgumentException: the nameMapping array and the number
>> of columns read should be the same size (nameMapping length = 24, columns =
>> 5)*
>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>> at
>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>> at
>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>
>>
> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>
>>
>> Routing to dev@apex
>>
>> Amol
>>
>>
>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <ch...@gmail.com> wrote:
>>
>>> Hi Pramod,
>>>
>>> thanks for the reply, it is working..
>>>
>>> And i have one more query on it, How to decide the block size?
>>>
>>> as per my understanding the
>>>
>>> noofBlocks=filesize / blocksize
>>>
>>> By this some records may be split into two blocks, when converting the
>>> record we dont have the complete data in one block.
>>>
>>> how to handle this?
>>>
>>> thanks in adavance.
>>>
>>> Thanks -Chiru
>>>
>>>
>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I would like to read a large file using filesplitter and emit tuples.So
>>>> i have writtent the code like below.
>>>>
>>>>
>>>> public class Reader extends
>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>
>>>> @Override
>>>> protected Data convertToRecord(byte[] data)  { ///
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> In my application class  i have created the object for filesplitter and
>>>> Reader classes and connect through stream.
>>>>
>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>> reader.blocksMetadataInput)
>>>>
>>>>
>>>> In properties file iam passing the directory path
>>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>
>>>> when i run the application iam getting the below error:
>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>> 0,
>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>> due to setup failure.*
>>>> *java.lang.IllegalArgumentException: empty files*
>>>>
>>>>
>>>> Please suggest is my approach is correct or not?
>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>> share any sample code line.
>>>>
>>>> thanks _Chiranjeevi
>>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Malhar" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to malhar-users...@googlegroups.com.
>>> To post to this group, send email to malhar...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/malhar-users.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by vk <ve...@gmail.com>.
 Here is a detailed description of the problem.

>
> My file size : *7,590,177  bytes*
>
> FIle splitter block size config :
>
>
>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>  <value>*16806*</value>
>
>
> *MyBlockReader Implementation:*
>
> @Override
> protected void readBlock(BlockMetadata blockMetadata) throws IOException {
> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
> ReaderContext.Entity entity;
> csvReader = new CsvBeanReader(new InputStreamReader(stream, 
> encoding),csvPreference);
> while ((entity = readerContext.next()) != null) {
> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
> Packages record = convertToRecord(entity.getRecord());
> //if record is partial, ignore the record
> if (record != null) {
> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
> data.emit(record);
> }
> } 
> }
>
>
> @Override
> protected Packages convertToRecord(byte[] data) {
> Packages bean = null;
> try {
> bean = csvReader.read(Packages.class,Packages.COLS);
> } catch (IOException e) {
> e.printStackTrace();
> }
> return bean;
> }
>
>
> Based on the above, when blocks are created a record might be split into 
> two different blocks. When reading the blocks and converting them to beans, 
> it has to set the offset values appropriately to merge the split record 
> into one and process it. It looks like this implementation 
> is already handled in the API when *readerContext.initialize(stream, 
> blockMetadata, consecutiveBlock)* is called, but when tried to execute 
> with the above snippet, the following error is thrown because of the split 
> record. Can you please suggest?
>
> *Exception:*
>
> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR 
> engine.StreamingContainer run - Operator set 
> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff, 
> 0, 
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]] 
> stopped running due to an exception.
> *java.lang.IllegalArgumentException: the nameMapping array and the number 
> of columns read should be the same size (nameMapping length = 24, columns = 
> 5)*
> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
> at 
> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
> at 
> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>
>
On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>
>
> Routing to dev@apex
>
> Amol
>
>
> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <chir...@gmail.com <javascript:>> 
> wrote:
>
>> Hi Pramod,
>>
>> thanks for the reply, it is working..
>>
>> And i have one more query on it, How to decide the block size?
>>
>> as per my understanding the 
>>
>> noofBlocks=filesize / blocksize 
>>
>> By this some records may be split into two blocks, when converting the 
>> record we dont have the complete data in one block.
>>
>> how to handle this?
>>
>> thanks in adavance.
>>
>> Thanks -Chiru 
>>
>>
>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>
>>> Hi All,
>>>
>>> I would like to read a large file using filesplitter and emit tuples.So 
>>> i have writtent the code like below.
>>>
>>>
>>> public class Reader extends 
>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>
>>> @Override
>>> protected Data convertToRecord(byte[] data)  { ///
>>> }
>>>
>>> }
>>>
>>>
>>> In my application class  i have created the object for filesplitter and 
>>> Reader classes and connect through stream.
>>>
>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, 
>>> reader.blocksMetadataInput)
>>>
>>>
>>> In properties file iam passing the directory path 
>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>
>>> when i run the application iam getting the below error:
>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR 
>>> engine.StreamingContainer run - Abandoning deployment of operator 
>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff, 
>>> 0, 
>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]] 
>>> due to setup failure.*
>>> *java.lang.IllegalArgumentException: empty files*
>>>
>>>
>>> Please suggest is my approach is correct or not? 
>>> how to read data using Filesplitter using malhar-library-3.1.0.jar , 
>>> share any sample code line.
>>>
>>> thanks _Chiranjeevi
>>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Malhar" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to malhar-users...@googlegroups.com <javascript:>.
>> To post to this group, send email to malhar...@googlegroups.com 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/malhar-users.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

Re: [malhar-users] Re: How to use FileSplitter to read huge file

Posted by Chandni Singh <ch...@datatorrent.com>.
Hi Chiranjeevi,

By default FileSplitter sets its block size to the hdfs default block size
which is optimal in most cases.

The BlockReaders in Malhar library handle the case when the record is split
across blocks.
AbstractFSReadAheadLineReader does this by ignoring the bytes till the
first end-of-line character and  always reading ahead a line, i.e., even if
a line boundary coincides with the block boundary, it reads next line from
the next block.

Chandni