You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yiannis Gkoufas <jo...@gmail.com> on 2015/07/15 02:23:36 UTC

Sorted Multiple Outputs

Hi there,

I have been using the approach described here:

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

In addition to that, I was wondering if there is a way to set the customize
the order of those values contained in each file.

Thanks a lot!

Re: Sorted Multiple Outputs

Posted by Yiannis Gkoufas <jo...@gmail.com>.
Hi Eugene,

in my case the list of values that I want to sort and write to a separate
file, its fairly small so the way I solved it is the following:

.groupByKey().foreach(e => {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig);
  val newPath = rootPath+"/"+e._1;
  val dstream = hdfs.create(new Path(newPath));
  val bstream = new BufferedOutputStream(dstream, 100 * 1024)
  val writer = new PrintWriter(bstream)
  e._2.toList.sortBy(_._1).foreach(sub => {
    writer.println(Utils.getDateStr(sub._1)+","+sub._2+","+sub._3);
  })
  writer.flush()
  writer.close();
})


Not sure what I changed to the way I write to HDFS, but this approach worked.


Thanks a lot!


On 13 August 2015 at 01:06, Eugene Morozov <fa...@list.ru> wrote:

> Yiannis,
>
> sorry for late response,
> It is indeed not possible to create new RDD inside of foreachPartitions,
> so you have to write data manually. I haven’t tried that and haven’t got
> such an exception, but I’d assume you might try to write locally and them
> upload it into HDFS. FileSystem has a specific method for that
> “copyFromLocalFile”.
>
> Another approach would be to try to split RDD into multiple RDDs by key.
> You can get distinct keys, collect them on driver and have a loop over they
> keys and filter out new RDD out of the original one by that key.
>
> for( key : keys ) {
>     RDD.filter( key ).saveAsTextfile()
> }
>
> It might help to cache original rdd.
>
> On 16 Jul 2015, at 12:21, Yiannis Gkoufas <jo...@gmail.com> wrote:
>
> Hi Eugene,
>
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
>
> Failed on local exception: java.io.InterruptedIOException: Interruped
> while waiting for IO on channel java.nio.channels.SocketChannel
>
> Probably it's hitting a race condition.
>
> Has anyone else faced this situation? Any suggestions?
>
> Thanks a lot!
>
> On 15 July 2015 at 14:04, Eugene Morozov <fa...@list.ru> wrote:
>
>> Yiannis ,
>>
>> It looks like you might explore other approach.
>>
>> sc.textFile("input/path")
>> .map() // your own implementation
>> .partitionBy(new HashPartitioner(num))
>> .groupBy() //your own implementation, as a result - PairRDD of key vs
>> Iterable of values
>> .foreachPartition()
>>
>> On the last step you could sort all values for the key and store them
>> into separate file even into the same directory of all other files for
>> other keys.
>> HashParititoner must guarantee that all values for specific key will
>> reside in just one partition, but it might happen that one partition might
>> contain more, than one key (with values). This I’m not sure, but that
>> shouldn’t be a big deal as you would iterate over tuple<key,
>> Iterable<value>> and store one key to a specific file.
>>
>> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <jo...@gmail.com> wrote:
>>
>> Hi there,
>>
>> I have been using the approach described here:
>>
>>
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>>
>> In addition to that, I was wondering if there is a way to set the
>> customize the order of those values contained in each file.
>>
>> Thanks a lot!
>>
>>
>> Eugene Morozov
>> fathersson@list.ru
>>
>>
>>
>>
>>
>
> Eugene Morozov
> fathersson@list.ru
>
>
>
>
>

Re: Sorted Multiple Outputs

Posted by Eugene Morozov <fa...@list.ru>.
Yiannis, 

sorry for late response, 
It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write data manually. I haven’t tried that and haven’t got such an exception, but I’d assume you might try to write locally and them upload it into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.

Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct keys, collect them on driver and have a loop over they keys and filter out new RDD out of the original one by that key.

for( key : keys ) {
    RDD.filter( key ).saveAsTextfile()
}

It might help to cache original rdd.

On 16 Jul 2015, at 12:21, Yiannis Gkoufas <jo...@gmail.com> wrote:

> Hi Eugene,
> 
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
> 
> Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel
> 
> Probably it's hitting a race condition.
> 
> Has anyone else faced this situation? Any suggestions?
> 
> Thanks a lot! 
> 
> On 15 July 2015 at 14:04, Eugene Morozov <fa...@list.ru> wrote:
> Yiannis ,
> 
> It looks like you might explore other approach.
> 
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values
> .foreachPartition()
> 
> On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. 
> HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuple<key, Iterable<value>> and store one key to a specific file.
> 
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <jo...@gmail.com> wrote:
> 
>> Hi there,
>> 
>> I have been using the approach described here:
>> 
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>> 
>> In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file.
>> 
>> Thanks a lot!
> 
> Eugene Morozov
> fathersson@list.ru
> 
> 
> 
> 
> 

Eugene Morozov
fathersson@list.ru





Re: Sorted Multiple Outputs

Posted by Yiannis Gkoufas <jo...@gmail.com>.
Hi Eugene,

thanks for your response!
Your recommendation makes sense, that's what I more or less tried.
The problem that I am facing is that inside foreachPartition() I cannot
create a new rdd and use saveAsTextFile.
It would probably make sense to write directly to HDFS using the Java API.
When I tried that I was getting errors similar to this:

Failed on local exception: java.io.InterruptedIOException: Interruped while
waiting for IO on channel java.nio.channels.SocketChannel

Probably it's hitting a race condition.

Has anyone else faced this situation? Any suggestions?

Thanks a lot!

On 15 July 2015 at 14:04, Eugene Morozov <fa...@list.ru> wrote:

> Yiannis ,
>
> It looks like you might explore other approach.
>
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs
> Iterable of values
> .foreachPartition()
>
> On the last step you could sort all values for the key and store them into
> separate file even into the same directory of all other files for other
> keys.
> HashParititoner must guarantee that all values for specific key will
> reside in just one partition, but it might happen that one partition might
> contain more, than one key (with values). This I’m not sure, but that
> shouldn’t be a big deal as you would iterate over tuple<key,
> Iterable<value>> and store one key to a specific file.
>
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <jo...@gmail.com> wrote:
>
> Hi there,
>
> I have been using the approach described here:
>
>
> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>
> In addition to that, I was wondering if there is a way to set the
> customize the order of those values contained in each file.
>
> Thanks a lot!
>
>
> Eugene Morozov
> fathersson@list.ru
>
>
>
>
>

Re: Sorted Multiple Outputs

Posted by Eugene Morozov <fa...@list.ru>.
Yiannis ,

It looks like you might explore other approach.

sc.textFile("input/path")
.map() // your own implementation
.partitionBy(new HashPartitioner(num))
.groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values
.foreachPartition()

On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. 
HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuple<key, Iterable<value>> and store one key to a specific file.

On 15 Jul 2015, at 03:23, Yiannis Gkoufas <jo...@gmail.com> wrote:

> Hi there,
> 
> I have been using the approach described here:
> 
> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
> 
> In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file.
> 
> Thanks a lot!

Eugene Morozov
fathersson@list.ru