You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pankaj Wahane <pa...@qiotec.com> on 2015/08/26 04:55:13 UTC

Question on take function - Spark Java API

Hi community members,


> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
> 
> Question:
> 
> I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row..
> 
> Ex: File 1
> 
> TestAsset01
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,123,456,789
> 11-01-2015 15:00:01,123,456,789
> . . .
> 
> Ex: File 2
> 
> TestAsset02
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,1230,4560,7890
> 11-01-2015 15:00:01,1230,4560,7890
> . . .
> 
> I have got nearly 1000 files in each folder sizing ~10G
> 
> I am using apache spark Java api to read all this files.
> 
> Following is code extract that I am using:
> 
> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>             Map<String, String> readingTypeMap = getReadingTypesMap(sc);
>             //Read File
>             JavaRDD<String> data = sc.textFile(resourceBundle.getString(FOLDER_NAME));
>             //Get Asset
>             String asset = data.take(1).get(0);
>             //Extract Time Series Data
>             JavaRDD<String> actualData = data.filter(line -> line.contains(DELIMERTER));
>             //Strip header
>             String header = actualData.take(1).get(0);
>             String[] headers = header.split(DELIMERTER);
>             //Extract actual data
>             JavaRDD<String> timeSeriesLines = actualData.filter(line -> !line.equals(header));
>             //Extract valid records
>             JavaRDD<String> validated = timeSeriesLines.filter(line -> validate(line));
>             //Find Granularity
>             Integer granularity = toInt(resourceBundle.getString(GRANULARITY));
>             //Transform to TSD objects
>             JavaRDD<TimeSeriesData> tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
> 
>             //Save to Cassandra
>             javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>                     "time_series_data", mapToRow(TimeSeriesData.class)).saveToCassandra();
> 
>             System.out.println("Total Records: " + timeSeriesLines.count());
>             System.out.println("Valid Records: " + validated.count());
>         }
> Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files.
> 
> 
> Thank You.
> 
> Best Regards,
> Pankaj
> 
> 


-- 


QIO Technologies Limited is a limited company registered in England & Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.

Re: Question on take function - Spark Java API

Posted by Pankaj Wahane <pa...@qiotec.com>.
Thanks Sonal.. I shall try doing that..

> On 26-Aug-2015, at 1:05 pm, Sonal Goyal <so...@gmail.com> wrote:
> 
> You can try using wholeTextFile which will give you a pair rdd of fileName, content. flatMap through this and manipulate the content. 
> 
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co/> 
> Check out Reifier at Spark Summit 2015 <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
> 
>  <http://in.linkedin.com/in/sonalgoyal>
> 
> 
> 
> On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane <pankaj.wahane@qiotec.com <ma...@qiotec.com>> wrote:
> Hi community members,
> 
> 
>> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
>> 
>> Question:
>> 
>> I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row..
>> 
>> Ex: File 1
>> 
>> TestAsset01
>> Time,dp_1,dp_2,dp_3
>> 11-01-2015 15:00:00,123,456,789
>> 11-01-2015 15:00:01,123,456,789
>> . . .
>> 
>> Ex: File 2
>> 
>> TestAsset02
>> Time,dp_1,dp_2,dp_3
>> 11-01-2015 15:00:00,1230,4560,7890
>> 11-01-2015 15:00:01,1230,4560,7890
>> . . .
>> 
>> I have got nearly 1000 files in each folder sizing ~10G
>> 
>> I am using apache spark Java api to read all this files.
>> 
>> Following is code extract that I am using:
>> 
>> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>>             Map<String, String> readingTypeMap = getReadingTypesMap(sc);
>>             //Read File
>>             JavaRDD<String> data = sc.textFile(resourceBundle.getString(FOLDER_NAME));
>>             //Get Asset
>>             String asset = data.take(1).get(0);
>>             //Extract Time Series Data
>>             JavaRDD<String> actualData = data.filter(line -> line.contains(DELIMERTER));
>>             //Strip header
>>             String header = actualData.take(1).get(0);
>>             String[] headers = header.split(DELIMERTER);
>>             //Extract actual data
>>             JavaRDD<String> timeSeriesLines = actualData.filter(line -> !line.equals(header));
>>             //Extract valid records
>>             JavaRDD<String> validated = timeSeriesLines.filter(line -> validate(line));
>>             //Find Granularity
>>             Integer granularity = toInt(resourceBundle.getString(GRANULARITY));
>>             //Transform to TSD objects
>>             JavaRDD<TimeSeriesData> tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
>> 
>>             //Save to Cassandra
>>             javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>>                     "time_series_data", mapToRow(TimeSeriesData.class)).saveToCassandra();
>> 
>>             System.out.println("Total Records: " + timeSeriesLines.count());
>>             System.out.println("Valid Records: " + validated.count());
>>         }
>> Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files.
>> 
>> 
>> Thank You.
>> 
>> Best Regards,
>> Pankaj
>> 
>> 
> 
> 
> QIO Technologies Limited is a limited company registered in England & Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 
> 
> This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
> 
> 


-- 


QIO Technologies Limited is a limited company registered in England & Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.

Re: Question on take function - Spark Java API

Posted by Sonal Goyal <so...@gmail.com>.
You can try using wholeTextFile which will give you a pair rdd of fileName,
content. flatMap through this and manipulate the content.

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Check out Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane <pa...@qiotec.com>
wrote:

> Hi community members,
>
>
> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
>
> *Question:*
>
> I have multiple files in a folder and and the first line in each file is
> name of the asset that the file belongs to. Second line is csv header row
> and data starts from third row..
>
> Ex: File 1
>
> TestAsset01
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,123,456,789
> 11-01-2015 15:00:01,123,456,789
> . . .
>
> Ex: File 2
>
> TestAsset02
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,1230,4560,7890
> 11-01-2015 15:00:01,1230,4560,7890
> . . .
>
> I have got nearly 1000 files in each folder sizing ~10G
>
> I am using apache spark Java api to read all this files.
>
> Following is code extract that I am using:
>
> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>             Map<String, String> readingTypeMap = getReadingTypesMap(sc);
>             //Read File
>             JavaRDD<String> data = sc.textFile(resourceBundle.getString(FOLDER_NAME));
>             //Get Asset
>             String asset = data.take(1).get(0);
>             //Extract Time Series Data
>             JavaRDD<String> actualData = data.filter(line -> line.contains(DELIMERTER));
>             //Strip header
>             String header = actualData.take(1).get(0);
>             String[] headers = header.split(DELIMERTER);
>             //Extract actual data
>             JavaRDD<String> timeSeriesLines = actualData.filter(line -> !line.equals(header));
>             //Extract valid records
>             JavaRDD<String> validated = timeSeriesLines.filter(line -> validate(line));
>             //Find Granularity
>             Integer granularity = toInt(resourceBundle.getString(GRANULARITY));
>             //Transform to TSD objects
>             JavaRDD<TimeSeriesData> tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
>
>             //Save to Cassandra
>             javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>                     "time_series_data", mapToRow(TimeSeriesData.class)).saveToCassandra();
>
>             System.out.println("Total Records: " + timeSeriesLines.count());
>             System.out.println("Valid Records: " + validated.count());
>         }
>
> Within TimeSeriesData Object I need to set the asset name for the reading,
> so I need output of data.take(1) to be different for different files.
>
> Thank You.
>
> Best Regards,
> Pankaj
>
>
>
>
> QIO Technologies Limited is a limited company registered in England &
> Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number
> 09368431
>
> This message and the information contained within it is intended solely
> for the addressee and may contain confidential or privileged information.
> If you have received this message in error please notify QIO Technologies
> Limited immediately and then permanently delete this message. If you are
> not the intended addressee then you must not copy, transmit, disclose or
> rely on the information contained in this message or in any attachment to
> it, all such use is prohibited to maximum extent possible by law.
>