You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by rishikesh <ri...@hotmail.com> on 2015/07/20 06:53:32 UTC

Error while Partitioning

Hi

I am executing a simple flow as shown below

*data = sc.wholeTextFiles(...)
tokens = data.flatMap(<<function>>)

counts = tokens.map(lambda token: (token,1))
counters = counts.reduceByKey(lambda a,b: a+b)
counters.sortBy(lambda x:x[1],False).saveAsTextFile(...)
*

There are some problems that I am facing

1. If I execute this code as is then it runs fine. However it takes a lot of
time. Given my cluster size and the data size I feel the time is on the
higher side. What I realized is that the number of partitions that the
variable data has is just 2. That is one reason probably why the speed is
less
2. To overcome the partition problem I added a second argument to the
function sc.wholeTextFiles. This was the number of partitions, I passed a
higher number like 100. There was speed up but then I see this exception

*  File
"/hadoop/yarn/local/usercache/nrec/filecache/512/spark-assembly-1.2.1.2.2.6.0-2800-hadoop2.6.0.2.2.6.0-2800.jar/pyspark/worker.py",
line 107, in main
    process()
  File
"/hadoop/yarn/local/usercache/nrec/filecache/512/spark-assembly-1.2.1.2.2.6.0-2800-hadoop2.6.0.2.2.6.0-2800.jar/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in
pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in
pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in
pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 245, in
func
    return chain.from_iterable(imap(f, iterator))
SystemError: Objects/cellobject.c:24: bad argument to internal function

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
	at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
	at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
	at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)*

The processing however does complete and I do get the results, sometimes
though even that does not happen. I am on version 1.2.1

3. No matter how many partitions I do the last Stage takes a really long
time, 95% of the total execution time. I am assuming the reduce is not
working and I need to do some sort of combine operation before reduce. Is
that supported in Spark or is there some other better alternative

Thanks
Rishi





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-Partitioning-tp23906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org