You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Wei Zhong <we...@gmail.com> on 2020/12/07 12:02:32 UTC

Re: Urgent help on S3 CSV file reader DataStream Job

Hi Deep,

(redirecting this to user mailing list as this is not a dev question)

You can try to set the line delimiter and field delimiter of the RowCsvInputFormat to a non-printing character (assume there is no non-printing characters in the csv files). It will read all the content of a csv file into one Row. e.g.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();
String path = "test";
TypeInformation[] fieldTypes = new TypeInformation[]{
   BasicTypeInfo.STRING_TYPE_INFO};
RowCsvInputFormat csvFormat = 
   new RowCsvInputFormat(new Path(path), fieldTypes);
csvFormat.setNestedFileEnumeration(true);
csvFormat.setDelimiter((char) 0);
csvFormat.setFieldDelimiter(String.valueOf((char) 0));
DataStream<Row>
   lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
   -1);lines.map(value -> value).print();
env.execute();

Then you can convert the content of the csv files to json manually.

Best,
Wei


> 在 2020年12月7日,19:10,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> 
> Hi  Guys,
> 
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
> 
> Csv file data format   :
> -------------------------------
> *field_id,data,*
> 
> 
> 
> *A,1B,3C,4D,9*
> 
> Code snippet:
> --------------------------
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> RowCsvInputFormat(            new Path(path),
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
> 
> 
> Any help is highly appreciated.
> 
> Thanks,
> -Deep


Re: Urgent help on S3 CSV file reader DataStream Job

Posted by Wei Zhong <we...@gmail.com>.
Hi Deep,

You can try to change the `FileProcessingMode.PROCESS_ONCE` to `FileProcessingMode.PROCESS_CONTINUOUSLY`.

Best,
Wei

> 在 2020年12月15日,20:18,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> 
> Hi Wei,
> Could you please suggest , how to fix this below issues.
> 
> Thanks & Regards,
> Deep
> 
> On Mon, 14 Dec, 2020, 10:28 AM DEEP NARAYAN Singh, <about.deep@gmail.com <ma...@gmail.com>> wrote:
> Hi Wei,
> No problem at all.Thanks for your response.
> Yes ,it is just starting from the beginning like no check pointing finished.
> 
> Thanks,
> -Deep
> 
> On Mon, 14 Dec, 2020, 8:01 AM Wei Zhong, <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
> Hi Deep,
> 
> Sorry for the late reply. Could you provide more specific information about the problem? e.g. did the job skip the file that was being processed during the last checkpointing, or did it start from the beginning just like no checkpointing finished?
> 
> Best,
> Wei
> 
>> 在 2020年12月12日,13:14,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> I'm sorry to bother you ,could you please help me in clarifying my doubt which have mentioned in previous email?
>> 
>> Thank you in advance.
>> 
>> Regards,
>> -Deep
>> 
>> On Fri, 11 Dec, 2020, 2:16 PM DEEP NARAYAN Singh, <about.deep@gmail.com <ma...@gmail.com>> wrote:
>> Hi Wei,
>> Just I want to clarify my doubt about check pointing as part of s3 datastream source . Let say my job started with a current resource and it got failed in between because of some lack of resource (e.g  Heap space Exception etc.), In that case what I observed was that if the job is auto restart by using restart strategy , it was not processing the data from the last checkpointing .
>> 
>> Could you please help me in how to handle this case as part of s3 data source.
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, Dec 8, 2020 at 10:22 PM DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> wrote:
>> Hi Wei,
>> Thanks you for the clarification. I have implemented the suggest approach and it is working fine now.🙂
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, 8 Dec, 2020, 5:24 PM Wei Zhong, <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
>> Hi Deep,
>> 
>> It seems that the TypeInformation array in your code has 2 elements, but we only need one here. This approach treats the entire csv file as a Row which has only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the TextInputFormat instead of the RowCsvInputFormat, this problem can also be solved.
>> 
>> If you have created your own InputFormat via extending the RowCsvInputFormat, you can get the current file path via `this.currentSplit.getPath()` in your class. Note that if you choose to fill the file path into the second column of the Row, you do not need to make the above changes, because at this time we really need the TypeInformation array to contain two StringTypeInfo elements.
>> 
>> Best,
>> Wei
>> 
>> 
>>> 在 2020年12月8日,19:29,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi Wei, 
>>> 
>>> Also I need to know how to get file names  along with single Row data as part of Datastream during runtime.So that I can  extract some of the data from the file name in the next operator to construct the final json string.
>>> 
>>> Thanks,
>>> -Deep
>>> 
>>> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Wei, 
>>> 
>>> Please find the below code snippet:
>>> TypeInformation[] typeInformation = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
>>> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new org.apache.flink.core.fs.Path(directory), typeInformation);
>>> csvInputFormat.setDelimiter((char) 0);
>>> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
>>> csvInputFormat.setNestedFileEnumeration(true);
>>> csvInputFormat.setMinSplitSize(10);
>>> return environment
>>>         .readFile(csvInputFormat, directory, FileProcessingMode.PROCESS_ONCE, -1, S3Service.createCustomFilter(finalParameters))
>>>         .name("Source: Custom File Reader for path " + directory).setParallelism(readerParallelism);
>>> 
>>> But after that,I have created my own custom RowCsvInputFormat and enabled the csvInputFormat.setLenient(true) and modified the class a little bit then it worked.
>>> 
>>> // check valid start position
>>> if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
>>>    if (isLenient()) {
>>>       return true;
>>>    } else {
>>>       throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
>>>    }
>>> }
>>> Let me know if you need any details.
>>> Thanks,
>>> -Deep
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Deep,
>>> 
>>> Could you show your current code snippet? I have tried the Csv file data on my local machine and it works fine, so I guess what might be wrong elsewhere.
>>> 
>>> Best,
>>> Wei
>>> 
>>> 
>>> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
>>> > 
>>> > Hi Wei and Till,
>>> > Thanks for the quick reply.
>>> > 
>>> > @Wei, I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format :
>>> > Csv file data format   :
>>> > -------------------------------
>>> > field_id,data,
>>> > A,1
>>> > B,3
>>> > C,4
>>> > D,9
>>> > E,0,0,0,0
>>> > 
>>> > because of last row which contains more that two value, and its is throwing org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
>>> > 
>>> > How to handle the above corner case.Could you please suggest some way to handle this.
>>> > 
>>> > @Till, Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat  and transform will not work by using map operator. Correct me if i'm wrong .
>>> > 
>>> > Thanks & Regards,
>>> > -Deep
>>> > 
>>> > 
>>> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
>>> > Hi Deep,
>>> > 
>>> > Could you use the TextInputFormat which reads a file line by line? That way
>>> > you can do the JSON parsing as part of a mapper which consumes the file
>>> > lines.
>>> > 
>>> > Cheers,
>>> > Till
>>> > 
>>> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
>>> > 
>>> > > Hi Deep,
>>> > >
>>> > > (redirecting this to user mailing list as this is not a dev question)
>>> > >
>>> > > You can try to set the line delimiter and field delimiter of the
>>> > > RowCsvInputFormat to a non-printing character (assume there is no non-printing
>>> > > characters in the csv files). It will read all the content of a csv file
>>> > > into one Row. e.g.
>>> > >
>>> > > final StreamExecutionEnvironment env =
>>> > >    StreamExecutionEnvironment.getExecutionEnvironment();
>>> > > String path = "test";
>>> > > TypeInformation[] fieldTypes = new TypeInformation[]{
>>> > >    BasicTypeInfo.STRING_TYPE_INFO};
>>> > > RowCsvInputFormat csvFormat =
>>> > >    new RowCsvInputFormat(new Path(path), fieldTypes);
>>> > > csvFormat.setNestedFileEnumeration(true);
>>> > > csvFormat.setDelimiter((char) 0);
>>> > > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
>>> > > DataStream<Row>
>>> > >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>>> > >    -1);lines.map(value -> value).print();
>>> > > env.execute();
>>> > >
>>> > >
>>> > > Then you can convert the content of the csv files to json manually.
>>> > >
>>> > > Best,
>>> > > Wei
>>> > >
>>> > >
>>> > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
>>> > >
>>> > > Hi  Guys,
>>> > >
>>> > > Below is my code snippet , which read all csv files under the given folder
>>> > > row by row but my requirement is to read csv file at a time and  convert as
>>> > > json which will looks like :
>>> > > {"A":"1","B":"3","C":"4","D":9}
>>> > >
>>> > > Csv file data format   :
>>> > > -------------------------------
>>> > > *field_id,data,*
>>> > >
>>> > >
>>> > >
>>> > > *A,1B,3C,4D,9*
>>> > >
>>> > > Code snippet:
>>> > > --------------------------
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > *final StreamExecutionEnvironment env =
>>> > > StreamExecutionEnvironment.getExecutionEnvironment();String path =
>>> > > "s3://messages/data/test/dev/2020-12-07/67241306/ <>";TypeInformation[]
>>> > > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>>> > >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
>>> > > RowCsvInputFormat(            new Path(path),
>>> > >
>>> > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
>>> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>>> > > -1);lines.map(value -> value).print();*
>>> > >
>>> > >
>>> > > Any help is highly appreciated.
>>> > >
>>> > > Thanks,
>>> > > -Deep
>>> > >
>>> > >
>>> > >
>>> 
>> 
> 


Re: Urgent help on S3 CSV file reader DataStream Job

Posted by Wei Zhong <we...@gmail.com>.
Hi Deep,

It seems that the TypeInformation array in your code has 2 elements, but we only need one here. This approach treats the entire csv file as a Row which has only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the TextInputFormat instead of the RowCsvInputFormat, this problem can also be solved.

If you have created your own InputFormat via extending the RowCsvInputFormat, you can get the current file path via `this.currentSplit.getPath()` in your class. Note that if you choose to fill the file path into the second column of the Row, you do not need to make the above changes, because at this time we really need the TypeInformation array to contain two StringTypeInfo elements.

Best,
Wei


> 在 2020年12月8日,19:29,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> 
> Hi Wei, 
> 
> Also I need to know how to get file names  along with single Row data as part of Datastream during runtime.So that I can  extract some of the data from the file name in the next operator to construct the final json string.
> 
> Thanks,
> -Deep
> 
> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> wrote:
> Hi Wei, 
> 
> Please find the below code snippet:
> TypeInformation[] typeInformation = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new org.apache.flink.core.fs.Path(directory), typeInformation);
> csvInputFormat.setDelimiter((char) 0);
> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
> csvInputFormat.setNestedFileEnumeration(true);
> csvInputFormat.setMinSplitSize(10);
> return environment
>         .readFile(csvInputFormat, directory, FileProcessingMode.PROCESS_ONCE, -1, S3Service.createCustomFilter(finalParameters))
>         .name("Source: Custom File Reader for path " + directory).setParallelism(readerParallelism);
> 
> But after that,I have created my own custom RowCsvInputFormat and enabled the csvInputFormat.setLenient(true) and modified the class a little bit then it worked.
> 
> // check valid start position
> if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
>    if (isLenient()) {
>       return true;
>    } else {
>       throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
>    }
> }
> Let me know if you need any details.
> Thanks,
> -Deep
> 
> 
> 
> 
> 
> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
> Hi Deep,
> 
> Could you show your current code snippet? I have tried the Csv file data on my local machine and it works fine, so I guess what might be wrong elsewhere.
> 
> Best,
> Wei
> 
> 
> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
> > 
> > Hi Wei and Till,
> > Thanks for the quick reply.
> > 
> > @Wei, I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format :
> > Csv file data format   :
> > -------------------------------
> > field_id,data,
> > A,1
> > B,3
> > C,4
> > D,9
> > E,0,0,0,0
> > 
> > because of last row which contains more that two value, and its is throwing org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
> > 
> > How to handle the above corner case.Could you please suggest some way to handle this.
> > 
> > @Till, Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat  and transform will not work by using map operator. Correct me if i'm wrong .
> > 
> > Thanks & Regards,
> > -Deep
> > 
> > 
> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
> > Hi Deep,
> > 
> > Could you use the TextInputFormat which reads a file line by line? That way
> > you can do the JSON parsing as part of a mapper which consumes the file
> > lines.
> > 
> > Cheers,
> > Till
> > 
> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0618@gmail.com <ma...@gmail.com>> wrote:
> > 
> > > Hi Deep,
> > >
> > > (redirecting this to user mailing list as this is not a dev question)
> > >
> > > You can try to set the line delimiter and field delimiter of the
> > > RowCsvInputFormat to a non-printing character (assume there is no non-printing
> > > characters in the csv files). It will read all the content of a csv file
> > > into one Row. e.g.
> > >
> > > final StreamExecutionEnvironment env =
> > >    StreamExecutionEnvironment.getExecutionEnvironment();
> > > String path = "test";
> > > TypeInformation[] fieldTypes = new TypeInformation[]{
> > >    BasicTypeInfo.STRING_TYPE_INFO};
> > > RowCsvInputFormat csvFormat =
> > >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > > csvFormat.setNestedFileEnumeration(true);
> > > csvFormat.setDelimiter((char) 0);
> > > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > > DataStream<Row>
> > >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > >    -1);lines.map(value -> value).print();
> > > env.execute();
> > >
> > >
> > > Then you can convert the content of the csv files to json manually.
> > >
> > > Best,
> > > Wei
> > >
> > >
> > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.deep@gmail.com <ma...@gmail.com>> 写道:
> > >
> > > Hi  Guys,
> > >
> > > Below is my code snippet , which read all csv files under the given folder
> > > row by row but my requirement is to read csv file at a time and  convert as
> > > json which will looks like :
> > > {"A":"1","B":"3","C":"4","D":9}
> > >
> > > Csv file data format   :
> > > -------------------------------
> > > *field_id,data,*
> > >
> > >
> > >
> > > *A,1B,3C,4D,9*
> > >
> > > Code snippet:
> > > --------------------------
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> > >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > > RowCsvInputFormat(            new Path(path),
> > >
> > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > > -1);lines.map(value -> value).print();*
> > >
> > >
> > > Any help is highly appreciated.
> > >
> > > Thanks,
> > > -Deep
> > >
> > >
> > >
> 


Re: Urgent help on S3 CSV file reader DataStream Job

Posted by Wei Zhong <we...@gmail.com>.
Hi Deep,

Could you show your current code snippet? I have tried the Csv file data on my local machine and it works fine, so I guess what might be wrong elsewhere.

Best,
Wei


> 在 2020年12月8日,03:20,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> 
> Hi Wei and Till,
> Thanks for the quick reply.
> 
> @Wei, I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format :
> Csv file data format   :
> -------------------------------
> field_id,data,
> A,1
> B,3
> C,4
> D,9
> E,0,0,0,0
> 
> because of last row which contains more that two value, and its is throwing org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
> 
> How to handle the above corner case.Could you please suggest some way to handle this.
> 
> @Till, Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat  and transform will not work by using map operator. Correct me if i'm wrong .
> 
> Thanks & Regards,
> -Deep
> 
> 
> On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <tr...@apache.org> wrote:
> Hi Deep,
> 
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
> 
> Cheers,
> Till
> 
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <we...@gmail.com> wrote:
> 
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >    StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >    BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream<Row>
> >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >    -1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given folder
> > row by row but my requirement is to read csv file at a time and  convert as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > -------------------------------
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > RowCsvInputFormat(            new Path(path),
> >
> > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >


Re: Urgent help on S3 CSV file reader DataStream Job

Posted by DEEP NARAYAN Singh <ab...@gmail.com>.
Hi Wei and Till,
Thanks for the quick reply.

*@Wei,* I tried with code which you have suggested and it is working fine
but I have one use case where it is failing, below is the csv input data
format :
Csv file data format   :
-------------------------------
*field_id,data,*



*A,1B,3C,4D,9*
*E,0,0,0,0*

because of last row which contains more that two value, and its is
throwing *org.apache.flink.api.common.io.ParseException:
Row too short: field_id,data,*

How to handle the above corner case.Could you please suggest some way to
handle this.

*@Till,* Could you please elaborate more which you are suggesting? As per
my use case I am dealing with multiple csv files under the given folder and
reading line by line using TextInputFormat  and transform will not work by
using map operator. Correct me if i'm wrong .

Thanks & Regards,
-Deep


On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Deep,
>
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
>
> Cheers,
> Till
>
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <we...@gmail.com> wrote:
>
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no
> non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >    StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >    BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream<Row>
> >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >    -1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given
> folder
> > row by row but my requirement is to read csv file at a time and  convert
> as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > -------------------------------
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > RowCsvInputFormat(            new Path(path),
> >
> >
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >
>

Re: Urgent help on S3 CSV file reader DataStream Job

Posted by Till Rohrmann <tr...@apache.org>.
Hi Deep,

Could you use the TextInputFormat which reads a file line by line? That way
you can do the JSON parsing as part of a mapper which consumes the file
lines.

Cheers,
Till

On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <we...@gmail.com> wrote:

> Hi Deep,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> You can try to set the line delimiter and field delimiter of the
> RowCsvInputFormat to a non-printing character (assume there is no non-printing
> characters in the csv files). It will read all the content of a csv file
> into one Row. e.g.
>
> final StreamExecutionEnvironment env =
>    StreamExecutionEnvironment.getExecutionEnvironment();
> String path = "test";
> TypeInformation[] fieldTypes = new TypeInformation[]{
>    BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvFormat =
>    new RowCsvInputFormat(new Path(path), fieldTypes);
> csvFormat.setNestedFileEnumeration(true);
> csvFormat.setDelimiter((char) 0);
> csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> DataStream<Row>
>    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>    -1);lines.map(value -> value).print();
> env.execute();
>
>
> Then you can convert the content of the csv files to json manually.
>
> Best,
> Wei
>
>
> 在 2020年12月7日,19:10,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
>
> Hi  Guys,
>
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
>
> Csv file data format   :
> -------------------------------
> *field_id,data,*
>
>
>
> *A,1B,3C,4D,9*
>
> Code snippet:
> --------------------------
>
>
>
>
>
>
>
>
>
>
>
>
>
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> RowCsvInputFormat(            new Path(path),
>
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
>
>
> Any help is highly appreciated.
>
> Thanks,
> -Deep
>
>
>

Re: Urgent help on S3 CSV file reader DataStream Job

Posted by Till Rohrmann <tr...@apache.org>.
Hi Deep,

Could you use the TextInputFormat which reads a file line by line? That way
you can do the JSON parsing as part of a mapper which consumes the file
lines.

Cheers,
Till

On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <we...@gmail.com> wrote:

> Hi Deep,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> You can try to set the line delimiter and field delimiter of the
> RowCsvInputFormat to a non-printing character (assume there is no non-printing
> characters in the csv files). It will read all the content of a csv file
> into one Row. e.g.
>
> final StreamExecutionEnvironment env =
>    StreamExecutionEnvironment.getExecutionEnvironment();
> String path = "test";
> TypeInformation[] fieldTypes = new TypeInformation[]{
>    BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvFormat =
>    new RowCsvInputFormat(new Path(path), fieldTypes);
> csvFormat.setNestedFileEnumeration(true);
> csvFormat.setDelimiter((char) 0);
> csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> DataStream<Row>
>    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>    -1);lines.map(value -> value).print();
> env.execute();
>
>
> Then you can convert the content of the csv files to json manually.
>
> Best,
> Wei
>
>
> 在 2020年12月7日,19:10,DEEP NARAYAN Singh <ab...@gmail.com> 写道:
>
> Hi  Guys,
>
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
>
> Csv file data format   :
> -------------------------------
> *field_id,data,*
>
>
>
> *A,1B,3C,4D,9*
>
> Code snippet:
> --------------------------
>
>
>
>
>
>
>
>
>
>
>
>
>
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> RowCsvInputFormat(            new Path(path),
>
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
>
>
> Any help is highly appreciated.
>
> Thanks,
> -Deep
>
>
>