You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lokesh R <lo...@ericsson.com> on 2017/08/23 18:07:22 UTC
Apache Flink Reading CSV Files ,Transform and Writting Back to CSV
using Paralliesm
Hi Team,
I am using the apache flink with java for below problem statement
1.where i will read a csv file with field delimeter character ;
2.transform the fields
3.write back the data back to csv
my doubts are as below
1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same
sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
and sample output is
000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
below is the code which i am currently running on local environment
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
package com.ericsson.voucher;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.util.Collector;
public class Classification {
private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(20);
long subViewStartTime = System.currentTimeMillis();
DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
.readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
.lineDelimiter("\n").types(String.class);
DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
.flatMap(new DataExtractor()).rebalance();
mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
mails.print();
long subViewEndTime = System.currentTimeMillis();
long subViewDifference = subViewEndTime - subViewStartTime;
System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");
}
public static class DataExtractor
extends
RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {
/**
*
*/
private static final long serialVersionUID = 1L;
public void flatMap(
Tuple1<String> paramIN,
org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
throws Exception {
String[] lines = paramIN.f0.split(";");
if (lines != null && lines.length > 0) {
String vocuherCode =lines[0];
vocuherCode=vocuherCode+"TEST1";
String VoucherId = lines[1];
String voucherNumber = lines[2];
String status = lines[3]+"TWTSTST";
String startDate = lines[4] + "";
String endDate = lines[5] + "";
String endStatus = lines[6];
String endVoucherNumber = lines[7];
out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
vocuherCode, VoucherId, voucherNumber, status,
startDate, endDate, endStatus, endVoucherNumber));
}
}
}
public static class RecordReducer
implements
GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
Tuple8<String, String, String, String, String, String, String, String>> {
/**
*
*/
private static final long serialVersionUID = -6045821605365596025L;
@Override
public void reduce(
Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
throws Exception {
// TODO Auto-generated method stub
String vocuherCode = null;
String VoucherId = null;
String voucherNumber = null;
String status = null;
String startDate = null;
String endDate = null;
String endStatus = null;
String endVoucherNumber = null;
for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
vocuherCode = m.f0;
VoucherId = m.f1;
voucherNumber = m.f2;
status = m.f3;
startDate = m.f4;
endDate = m.f5;
endStatus = m.f6;
endVoucherNumber = m.f7;
paramCollector
.collect(new Tuple8<String, String, String, String, String, String, String, String>(
vocuherCode, VoucherId, voucherNumber, status,
startDate, endDate, endStatus, endVoucherNumber));
}
}
}
}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application
Re: Apache Flink Reading CSV Files ,Transform and Writting Back to
CSV using Paralliesm
Posted by Lokesh Gowda <lo...@gmail.com>.
Hi Robert my question was if I need to read and write the csv file of
size which will be in gb how i can distribute the data sink to write into
files 1gb exactly and since I am
New to flink I am not sure about this
Regards
Lokesh.r
On Sat, Aug 26, 2017 at 2:56 AM Robert Metzger <rm...@apache.org> wrote:
> Hi Lokesh,
>
> I'm not sure if I fully understood your question. But you can not write
> the result in a single file from multiple writers.
> If you want to process the data fully distributed, you'll also have to
> write it distributed.
>
> On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lo...@ericsson.com> wrote:
>
>> Hi Team,
>>
>> I am using the apache flink with java for below problem statement
>>
>> 1.where i will read a csv file with field delimeter character ;
>> 2.transform the fields
>> 3.write back the data back to csv
>>
>> my doubts are as below
>>
>> 1. if i need to read the csv file of size above 50 gb what would be the
>> approach
>> 2 if i use Parallelism i am not able to split the data and collect it
>> since its a csv file
>> and while writing a back to csv its creating a multiple files to write
>> the data using the default Parallelism how can achieve the same
>>
>> sample input is
>>
>> 000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>> and sample output is
>>
>>
>> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>>
>> below is the code which i am currently running on local environment
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> package com.ericsson.voucher;
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple8;
>>
>> import org.apache.flink.util.Collector;
>>
>>
>> public class Classification {
>>
>> private static final String OUTPUT_PATH =
>> "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";
>>
>> public static void main(String[] args) throws Exception {
>>
>>
>> ExecutionEnvironment env = ExecutionEnvironment
>> .getExecutionEnvironment();
>> env.setParallelism(20);
>>
>> long subViewStartTime = System.currentTimeMillis();
>> DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
>> .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
>> .lineDelimiter("\n").types(String.class);
>> DataSet<Tuple8<String,String, String, String, String, String, String,
>> String>> mails = rawdata
>> .flatMap(new DataExtractor()).rebalance();
>> mails.writeAsCsv(OUTPUT_PATH, "\n",
>> ";").setParallelism(1);
>> mails.print();
>> long subViewEndTime = System.currentTimeMillis();
>>
>> long subViewDifference = subViewEndTime - subViewStartTime;
>>
>> System.out.println("The Difference Time is"+
>> subViewDifference/1000 +"seconds");
>>
>> }
>>
>> public static class DataExtractor
>> extends
>> RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
>> String, String, String, String, String, String>> {
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>>
>> public void flatMap(
>> Tuple1<String> paramIN,
>> org.apache.flink.util.Collector<Tuple8<String, String,
>> String, String, String, String, String, String>> out)
>> throws Exception {
>> String[] lines = paramIN.f0.split(";");
>> if (lines != null && lines.length > 0) {
>> String vocuherCode =lines[0];
>> vocuherCode=vocuherCode+"TEST1";
>> String VoucherId = lines[1];
>> String voucherNumber = lines[2];
>> String status = lines[3]+"TWTSTST";
>> String startDate = lines[4] + "";
>> String endDate = lines[5] + "";
>> String endStatus = lines[6];
>> String endVoucherNumber = lines[7];
>>
>>
>>
>>
>>
>> out.collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>> vocuherCode, VoucherId, voucherNumber, status,
>> startDate, endDate, endStatus, endVoucherNumber));
>> }
>>
>> }
>>
>> }
>>
>> public static class RecordReducer
>> implements
>> GroupReduceFunction<Tuple8<String, String, String, String,
>> String, String, String, String>,
>> Tuple8<String, String, String, String, String, String,
>> String, String>> {
>>
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID =
>> -6045821605365596025L;
>>
>> @Override
>> public void reduce(
>> Iterable<Tuple8<String, String, String, String, String,
>> String, String, String>> paramIterable,
>> Collector<Tuple8<String, String, String, String, String,
>> String, String, String>> paramCollector)
>> throws Exception {
>> // TODO Auto-generated method stub
>> String vocuherCode = null;
>> String VoucherId = null;
>> String voucherNumber = null;
>> String status = null;
>> String startDate = null;
>> String endDate = null;
>> String endStatus = null;
>> String endVoucherNumber = null;
>> for (Tuple8<String, String, String, String, String, String,
>> String, String> m : paramIterable) {
>> vocuherCode = m.f0;
>> VoucherId = m.f1;
>> voucherNumber = m.f2;
>> status = m.f3;
>> startDate = m.f4;
>> endDate = m.f5;
>> endStatus = m.f6;
>> endVoucherNumber = m.f7;
>> paramCollector
>> .collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>> vocuherCode, VoucherId, voucherNumber, status,
>> startDate, endDate, endStatus, endVoucherNumber));
>>
>> }
>>
>>
>>
>> }
>> }
>>
>>
>> }
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> please help me on the same how can I achieve the portioning of fields on
>> the above data and achieve the parallism to increase the throughput of my
>> application
>>
>>
>>
>>
>>
>>
>>
>
>
Re: Apache Flink Reading CSV Files ,Transform and Writting Back to
CSV using Paralliesm
Posted by Robert Metzger <rm...@apache.org>.
Hi Lokesh,
I'm not sure if I fully understood your question. But you can not write the
result in a single file from multiple writers.
If you want to process the data fully distributed, you'll also have to
write it distributed.
On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lo...@ericsson.com> wrote:
> Hi Team,
>
> I am using the apache flink with java for below problem statement
>
> 1.where i will read a csv file with field delimeter character ;
> 2.transform the fields
> 3.write back the data back to csv
>
> my doubts are as below
>
> 1. if i need to read the csv file of size above 50 gb what would be the
> approach
> 2 if i use Parallelism i am not able to split the data and collect it
> since its a csv file
> and while writing a back to csv its creating a multiple files to write the
> data using the default Parallelism how can achieve the same
>
> sample input is
> 000008000077;151139924603;3526358005322;2;29/07/2016:00:
> 00:00;29/07/2018:00:00:00;20;4800019940
>
> and sample output is
>
> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/
> 07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>
>
> below is the code which i am currently running on local environment
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> package com.ericsson.voucher;
>
> import org.apache.flink.api.common.functions.GroupReduceFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple8;
>
> import org.apache.flink.util.Collector;
>
>
> public class Classification {
>
> private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\
> OutPut\\output.csv";
>
> public static void main(String[] args) throws Exception {
>
>
> ExecutionEnvironment env = ExecutionEnvironment
> .getExecutionEnvironment();
> env.setParallelism(20);
>
> long subViewStartTime = System.currentTimeMillis();
> DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
> .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
> .lineDelimiter("\n").types(String.class);
> DataSet<Tuple8<String,String, String, String, String, String, String,
> String>> mails = rawdata
> .flatMap(new DataExtractor()).rebalance();
> mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
> mails.print();
> long subViewEndTime = System.currentTimeMillis();
>
> long subViewDifference = subViewEndTime - subViewStartTime;
>
> System.out.println("The Difference Time is"+
> subViewDifference/1000 +"seconds");
>
> }
>
> public static class DataExtractor
> extends
> RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
> String, String, String, String, String, String>> {
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> public void flatMap(
> Tuple1<String> paramIN,
> org.apache.flink.util.Collector<Tuple8<String, String,
> String, String, String, String, String, String>> out)
> throws Exception {
> String[] lines = paramIN.f0.split(";");
> if (lines != null && lines.length > 0) {
> String vocuherCode =lines[0];
> vocuherCode=vocuherCode+"TEST1";
> String VoucherId = lines[1];
> String voucherNumber = lines[2];
> String status = lines[3]+"TWTSTST";
> String startDate = lines[4] + "";
> String endDate = lines[5] + "";
> String endStatus = lines[6];
> String endVoucherNumber = lines[7];
>
>
>
>
>
> out.collect(new Tuple8<String, String, String, String, String,
> String, String, String>(
> vocuherCode, VoucherId, voucherNumber, status,
> startDate, endDate, endStatus, endVoucherNumber));
> }
>
> }
>
> }
>
> public static class RecordReducer
> implements
> GroupReduceFunction<Tuple8<String, String, String, String,
> String, String, String, String>,
> Tuple8<String, String, String, String, String, String, String,
> String>> {
>
>
> /**
> *
> */
> private static final long serialVersionUID = -6045821605365596025L;
>
> @Override
> public void reduce(
> Iterable<Tuple8<String, String, String, String, String,
> String, String, String>> paramIterable,
> Collector<Tuple8<String, String, String, String, String,
> String, String, String>> paramCollector)
> throws Exception {
> // TODO Auto-generated method stub
> String vocuherCode = null;
> String VoucherId = null;
> String voucherNumber = null;
> String status = null;
> String startDate = null;
> String endDate = null;
> String endStatus = null;
> String endVoucherNumber = null;
> for (Tuple8<String, String, String, String, String, String,
> String, String> m : paramIterable) {
> vocuherCode = m.f0;
> VoucherId = m.f1;
> voucherNumber = m.f2;
> status = m.f3;
> startDate = m.f4;
> endDate = m.f5;
> endStatus = m.f6;
> endVoucherNumber = m.f7;
> paramCollector
> .collect(new Tuple8<String, String, String, String,
> String, String, String, String>(
> vocuherCode, VoucherId, voucherNumber, status,
> startDate, endDate, endStatus, endVoucherNumber));
>
> }
>
>
>
> }
> }
>
>
> }
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> please help me on the same how can I achieve the portioning of fields on
> the above data and achieve the parallism to increase the throughput of my
> application
>
>
>
>
>
>
>