You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lokesh R <lo...@ericsson.com> on 2017/08/23 18:07:22 UTC

Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

Hi Team,

I am using the apache flink with java for below problem statement

1.where i will read a csv file with field delimeter  character ;
2.transform the fields
3.write back the data back to csv

my doubts are as below

1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same

sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940

and sample output is

000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940


below is the code which i am currently running on local environment

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package com.ericsson.voucher;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;

import org.apache.flink.util.Collector;


public class Classification {

    private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";

    public static void main(String[] args) throws Exception {


        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(20);

         long subViewStartTime = System.currentTimeMillis();
        DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
                .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
                .lineDelimiter("\n").types(String.class);
    DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
                .flatMap(new DataExtractor()).rebalance();
                mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
           mails.print();
           long subViewEndTime = System.currentTimeMillis();

           long subViewDifference = subViewEndTime - subViewStartTime;

           System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");

    }

    public static class DataExtractor
            extends
            RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public void flatMap(
                Tuple1<String> paramIN,
                org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
                throws Exception {
            String[] lines = paramIN.f0.split(";");
            if (lines != null && lines.length > 0) {
                String vocuherCode =lines[0];
                vocuherCode=vocuherCode+"TEST1";
                String VoucherId =  lines[1];
                String voucherNumber = lines[2];
                String status = lines[3]+"TWTSTST";
                String startDate = lines[4] + "";
                String endDate = lines[5] + "";
                String endStatus = lines[6];
                String endVoucherNumber = lines[7];





            out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
                    vocuherCode, VoucherId, voucherNumber, status,
                    startDate, endDate, endStatus, endVoucherNumber));
            }

        }

    }

    public static class RecordReducer
            implements
            GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
            Tuple8<String, String, String, String, String, String, String, String>> {


        /**
         *
         */
        private static final long serialVersionUID = -6045821605365596025L;

        @Override
        public void reduce(
                Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
                Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
                throws Exception {
            // TODO Auto-generated method stub
            String vocuherCode = null;
            String VoucherId = null;
            String voucherNumber = null;
            String status = null;
            String startDate = null;
            String endDate = null;
            String endStatus = null;
            String endVoucherNumber = null;
            for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
                vocuherCode = m.f0;
                VoucherId = m.f1;
                voucherNumber = m.f2;
                status = m.f3;
                startDate = m.f4;
                endDate = m.f5;
                endStatus = m.f6;
                endVoucherNumber = m.f7;
                paramCollector
                .collect(new Tuple8<String, String, String, String, String, String, String, String>(
                        vocuherCode, VoucherId, voucherNumber, status,
                        startDate, endDate, endStatus, endVoucherNumber));

            }



        }
    }


}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application




Re: Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

Posted by Lokesh Gowda <lo...@gmail.com>.
 Hi Robert  my question was if I need to read and write the csv file  of
size which will be in gb how i can distribute the data sink to write into
files 1gb exactly and since I am
New to flink I am not sure about this


Regards
Lokesh.r

On Sat, Aug 26, 2017 at 2:56 AM Robert Metzger <rm...@apache.org> wrote:

> Hi Lokesh,
>
> I'm not sure if I fully understood your question. But you can not write
> the result in a single file from multiple writers.
> If you want to process the data fully distributed, you'll also have to
> write it distributed.
>
> On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lo...@ericsson.com> wrote:
>
>> Hi Team,
>>
>> I am using the apache flink with java for below problem statement
>>
>> 1.where i will read a csv file with field delimeter  character ;
>> 2.transform the fields
>> 3.write back the data back to csv
>>
>> my doubts are as below
>>
>> 1. if i need to read the csv file of size above 50 gb what would be the
>> approach
>> 2 if i use Parallelism i am not able to split the data and collect it
>> since its a csv file
>> and while writing a back to csv its creating a multiple files to write
>> the data using the default Parallelism how can achieve the same
>>
>> sample input is
>>
>> 000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>> and sample output is
>>
>>
>> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>>
>>
>> below is the code which i am currently running on local environment
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> package com.ericsson.voucher;
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple8;
>>
>> import org.apache.flink.util.Collector;
>>
>>
>> public class Classification {
>>
>>     private static final String OUTPUT_PATH =
>> "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";
>>
>>     public static void main(String[] args) throws Exception {
>>
>>
>>         ExecutionEnvironment env = ExecutionEnvironment
>>                 .getExecutionEnvironment();
>>         env.setParallelism(20);
>>
>>          long subViewStartTime = System.currentTimeMillis();
>>         DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
>>                 .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
>>                 .lineDelimiter("\n").types(String.class);
>>     DataSet<Tuple8<String,String, String, String, String, String, String,
>> String>> mails = rawdata
>>                 .flatMap(new DataExtractor()).rebalance();
>>                 mails.writeAsCsv(OUTPUT_PATH, "\n",
>> ";").setParallelism(1);
>>            mails.print();
>>            long subViewEndTime = System.currentTimeMillis();
>>
>>            long subViewDifference = subViewEndTime - subViewStartTime;
>>
>>            System.out.println("The Difference Time is"+
>> subViewDifference/1000 +"seconds");
>>
>>     }
>>
>>     public static class DataExtractor
>>             extends
>>             RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
>> String, String, String, String, String, String>> {
>>
>>         /**
>>          *
>>          */
>>         private static final long serialVersionUID = 1L;
>>
>>         public void flatMap(
>>                 Tuple1<String> paramIN,
>>                 org.apache.flink.util.Collector<Tuple8<String, String,
>> String, String, String, String, String, String>> out)
>>                 throws Exception {
>>             String[] lines = paramIN.f0.split(";");
>>             if (lines != null && lines.length > 0) {
>>                 String vocuherCode =lines[0];
>>                 vocuherCode=vocuherCode+"TEST1";
>>                 String VoucherId =  lines[1];
>>                 String voucherNumber = lines[2];
>>                 String status = lines[3]+"TWTSTST";
>>                 String startDate = lines[4] + "";
>>                 String endDate = lines[5] + "";
>>                 String endStatus = lines[6];
>>                 String endVoucherNumber = lines[7];
>>
>>
>>
>>
>>
>>             out.collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>>                     vocuherCode, VoucherId, voucherNumber, status,
>>                     startDate, endDate, endStatus, endVoucherNumber));
>>             }
>>
>>         }
>>
>>     }
>>
>>     public static class RecordReducer
>>             implements
>>             GroupReduceFunction<Tuple8<String, String, String, String,
>> String, String, String, String>,
>>             Tuple8<String, String, String, String, String, String,
>> String, String>> {
>>
>>
>>         /**
>>          *
>>          */
>>         private static final long serialVersionUID =
>> -6045821605365596025L;
>>
>>         @Override
>>         public void reduce(
>>                 Iterable<Tuple8<String, String, String, String, String,
>> String, String, String>> paramIterable,
>>                 Collector<Tuple8<String, String, String, String, String,
>> String, String, String>> paramCollector)
>>                 throws Exception {
>>             // TODO Auto-generated method stub
>>             String vocuherCode = null;
>>             String VoucherId = null;
>>             String voucherNumber = null;
>>             String status = null;
>>             String startDate = null;
>>             String endDate = null;
>>             String endStatus = null;
>>             String endVoucherNumber = null;
>>             for (Tuple8<String, String, String, String, String, String,
>> String, String> m : paramIterable) {
>>                 vocuherCode = m.f0;
>>                 VoucherId = m.f1;
>>                 voucherNumber = m.f2;
>>                 status = m.f3;
>>                 startDate = m.f4;
>>                 endDate = m.f5;
>>                 endStatus = m.f6;
>>                 endVoucherNumber = m.f7;
>>                 paramCollector
>>                 .collect(new Tuple8<String, String, String, String,
>> String, String, String, String>(
>>                         vocuherCode, VoucherId, voucherNumber, status,
>>                         startDate, endDate, endStatus, endVoucherNumber));
>>
>>             }
>>
>>
>>
>>         }
>>     }
>>
>>
>> }
>>
>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>
>> please help me on the same how can I achieve the portioning of fields on
>> the above data and achieve the parallism to increase the throughput of my
>> application
>>
>>
>>
>>
>>
>>
>>
>
>

Re: Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

Posted by Robert Metzger <rm...@apache.org>.
Hi Lokesh,

I'm not sure if I fully understood your question. But you can not write the
result in a single file from multiple writers.
If you want to process the data fully distributed, you'll also have to
write it distributed.

On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lo...@ericsson.com> wrote:

> Hi Team,
>
> I am using the apache flink with java for below problem statement
>
> 1.where i will read a csv file with field delimeter  character ;
> 2.transform the fields
> 3.write back the data back to csv
>
> my doubts are as below
>
> 1. if i need to read the csv file of size above 50 gb what would be the
> approach
> 2 if i use Parallelism i am not able to split the data and collect it
> since its a csv file
> and while writing a back to csv its creating a multiple files to write the
> data using the default Parallelism how can achieve the same
>
> sample input is
> 000008000077;151139924603;3526358005322;2;29/07/2016:00:
> 00:00;29/07/2018:00:00:00;20;4800019940
>
> and sample output is
>
> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/
> 07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>
>
> below is the code which i am currently running on local environment
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> package com.ericsson.voucher;
>
> import org.apache.flink.api.common.functions.GroupReduceFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple8;
>
> import org.apache.flink.util.Collector;
>
>
> public class Classification {
>
>     private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\
> OutPut\\output.csv";
>
>     public static void main(String[] args) throws Exception {
>
>
>         ExecutionEnvironment env = ExecutionEnvironment
>                 .getExecutionEnvironment();
>         env.setParallelism(20);
>
>          long subViewStartTime = System.currentTimeMillis();
>         DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
>                 .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
>                 .lineDelimiter("\n").types(String.class);
>     DataSet<Tuple8<String,String, String, String, String, String, String,
> String>> mails = rawdata
>                 .flatMap(new DataExtractor()).rebalance();
>                 mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
>            mails.print();
>            long subViewEndTime = System.currentTimeMillis();
>
>            long subViewDifference = subViewEndTime - subViewStartTime;
>
>            System.out.println("The Difference Time is"+
> subViewDifference/1000 +"seconds");
>
>     }
>
>     public static class DataExtractor
>             extends
>             RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
> String, String, String, String, String, String>> {
>
>         /**
>          *
>          */
>         private static final long serialVersionUID = 1L;
>
>         public void flatMap(
>                 Tuple1<String> paramIN,
>                 org.apache.flink.util.Collector<Tuple8<String, String,
> String, String, String, String, String, String>> out)
>                 throws Exception {
>             String[] lines = paramIN.f0.split(";");
>             if (lines != null && lines.length > 0) {
>                 String vocuherCode =lines[0];
>                 vocuherCode=vocuherCode+"TEST1";
>                 String VoucherId =  lines[1];
>                 String voucherNumber = lines[2];
>                 String status = lines[3]+"TWTSTST";
>                 String startDate = lines[4] + "";
>                 String endDate = lines[5] + "";
>                 String endStatus = lines[6];
>                 String endVoucherNumber = lines[7];
>
>
>
>
>
>             out.collect(new Tuple8<String, String, String, String, String,
> String, String, String>(
>                     vocuherCode, VoucherId, voucherNumber, status,
>                     startDate, endDate, endStatus, endVoucherNumber));
>             }
>
>         }
>
>     }
>
>     public static class RecordReducer
>             implements
>             GroupReduceFunction<Tuple8<String, String, String, String,
> String, String, String, String>,
>             Tuple8<String, String, String, String, String, String, String,
> String>> {
>
>
>         /**
>          *
>          */
>         private static final long serialVersionUID = -6045821605365596025L;
>
>         @Override
>         public void reduce(
>                 Iterable<Tuple8<String, String, String, String, String,
> String, String, String>> paramIterable,
>                 Collector<Tuple8<String, String, String, String, String,
> String, String, String>> paramCollector)
>                 throws Exception {
>             // TODO Auto-generated method stub
>             String vocuherCode = null;
>             String VoucherId = null;
>             String voucherNumber = null;
>             String status = null;
>             String startDate = null;
>             String endDate = null;
>             String endStatus = null;
>             String endVoucherNumber = null;
>             for (Tuple8<String, String, String, String, String, String,
> String, String> m : paramIterable) {
>                 vocuherCode = m.f0;
>                 VoucherId = m.f1;
>                 voucherNumber = m.f2;
>                 status = m.f3;
>                 startDate = m.f4;
>                 endDate = m.f5;
>                 endStatus = m.f6;
>                 endVoucherNumber = m.f7;
>                 paramCollector
>                 .collect(new Tuple8<String, String, String, String,
> String, String, String, String>(
>                         vocuherCode, VoucherId, voucherNumber, status,
>                         startDate, endDate, endStatus, endVoucherNumber));
>
>             }
>
>
>
>         }
>     }
>
>
> }
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> please help me on the same how can I achieve the portioning of fields on
> the above data and achieve the parallism to increase the throughput of my
> application
>
>
>
>
>
>
>