You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aamir Abbas (JIRA)" <ji...@apache.org> on 2016/06/14 09:49:01 UTC

[jira] [Comment Edited] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

    [ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329201#comment-15329201 ] 

Aamir Abbas edited comment on SPARK-15919 at 6/14/16 9:49 AM:
--------------------------------------------------------------

Here's the updated code, after using foreachRDD function.

/****************************************************************************/
javaStream.foreachRDD(new VoidFunction<JavaRDD<String>>(){

	private static final long serialVersionUID = -2508020533798690700L;
	private int random = (int)(Math.random() * 256);
	private String pathPrefix = getBaseOutputPathPrefix();
	private String pathSuffix = getBaseOutputPathSuffix();
	
	public void call(JavaRDD<String> s) {
		String path = pathPrefix + random + pathSuffix;
		s.saveAsTextFile(path);
	}
	
	private String getBaseOutputPathPrefix() {
		return "s3://bucket/folder/";
	}
	
	private String getBaseOutputPathSuffix() {
		
		DecimalFormat format = new DecimalFormat("00");
		DateTime dateTime = new DateTime();

		return "/" + dateTime.getYear() + format.format(dateTime.getMonthOfYear())
		+ format.format(dateTime.getDayOfMonth()) + format.format(dateTime.getHourOfDay())
		+ format.format(dateTime.getMinuteOfHour()) + format.format(dateTime.getSecondOfMinute()) + "/";
		
	}
	
});
/****************************************************************************/

The above code saves the output at the following location by adding part- files, where each file contains one RDD.

s3://bucket/folder/random_number/date_time/

The expected behaviour is to save the output at the very same location. However, all of the RDDs should be in one file.

For multiple batches of data, the output should be as follows:

s3://bucket/folder/random_number_1/date_time_1/
s3://bucket/folder/random_number_2/date_time_2/
s3://bucket/folder/random_number_3/date_time_3/
s3://bucket/folder/random_number_1/date_time_4/

and so on. I executed this code as a Spark job on AWS EMR cluster.

Please let me know if you need me to add more explanation anywhere.


was (Author: aamir.abbas):
Here's the updated code, after using foreachRDD function.

javaStream.foreachRDD(new VoidFunction<JavaRDD<String>>(){

	private static final long serialVersionUID = -2508020533798690700L;
	private int random = (int)(Math.random() * 256);
	private String pathPrefix = getBaseOutputPathPrefix();
	private String pathSuffix = getBaseOutputPathSuffix();
	
	public void call(JavaRDD<String> s) {
		String path = pathPrefix + random + pathSuffix;
		s.saveAsTextFile(path);
	}
	
	private String getBaseOutputPathPrefix() {
		return "s3://bucket/folder/";
	}
	
	private String getBaseOutputPathSuffix() {
		
		DecimalFormat format = new DecimalFormat("00");
		DateTime dateTime = new DateTime();

		return "/" + dateTime.getYear() + format.format(dateTime.getMonthOfYear())
		+ format.format(dateTime.getDayOfMonth()) + format.format(dateTime.getHourOfDay())
		+ format.format(dateTime.getMinuteOfHour()) + format.format(dateTime.getSecondOfMinute()) + "/";
		
	}
	
});

The above code saves the output at the following location by adding part- files, where each file contains one RDD.

s3://bucket/folder/random_number/date_time/

The expected behaviour is to save the output at the very same location. However, all of the RDDs should be in one file.

For multiple batches of data, the output should be as follows:

s3://bucket/folder/random_number_1/date_time_1/
s3://bucket/folder/random_number_2/date_time_2/
s3://bucket/folder/random_number_3/date_time_3/
s3://bucket/folder/random_number_1/date_time_4/

and so on. I executed this code as a Spark job on AWS EMR cluster.

Please let me know if you need me to add more explanation anywhere.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> ------------------------------------------------------------------------
>
>                 Key: SPARK-15919
>                 URL: https://issues.apache.org/jira/browse/SPARK-15919
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 1.6.1
>         Environment: Amazon EMR
>            Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which effectively means that function is not called again for the next batch of the stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org