You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Paul Tremblay <pa...@gmail.com> on 2017/02/06 22:35:27 UTC

wholeTextFiles fails, but textFile succeeds for same path

When I try to create an rdd using wholeTextFiles, I get an 
incomprehensible error. But when I use the same path with sc.textFile, I 
get no error.

I am using pyspark with spark 2.1.

in_path = 
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/

rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
    1341
    1342             p = range(partsScanned, min(partsScanned + 
numPartsToTry, totalParts))
-> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
    1344
    1345             items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
partitionFunc, partitions, allowLocal)
     963         # SparkContext#runJob.
     964         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions)
     966         return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))
     967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)
    1131         answer = self.gateway_client.send_command(command)
    1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
    1134
    1135         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
      61     def deco(*a, **kw):
      62         try:
---> 63             return f(*a, **kw)
      64         except py4j.protocol.Py4JJavaError as e:
      65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
     317                 raise Py4JJavaError(
     318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
     320             else:
     321                 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
executor 8): ExecutorLostFailure (executor 8 exited caused by one of the 
running tasks) Reason: Container marked as failed: 
container_1486415078210_0005_01_000016 on host: 
ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
Diagnostics: Exception from container-launch.
Container id: container_1486415078210_0005_01_000016
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
     at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
     at org.apache.hadoop.util.Shell.run(Shell.java:479)
     at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
     at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
     at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
     at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)

rdd = sc.textFile(in_path)

In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: wholeTextFiles fails, but textFile succeeds for same path

Posted by Henry Tremblay <pa...@gmail.com>.
51,000 files at about 1/2 MB per file. I am wondering if I need this

http://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Although if I am understanding you correctly, even if I copy the S3 
files to HDFS on EMR, and use wholeTextFiles, I am still only going to 
be able to use a single executor?

Henry

On 02/11/2017 01:03 PM, J�rn Franke wrote:
> Can you post more information about the number of files, their size 
> and the executor logs.
>
> A gzipped file is not splittable i.e. Only one executor can gunzip it 
> (the unzipped data can then be processed in parallel).
> Wholetextfile was designed to be executed only on one executor (e.g. 
> For processing xmls which are difficult to process in parallel).
>
> Then, if you have small files (< HDFS blocksize) they are also only 
> processed on one executor by default.
>
> You may repartition though for parallel processing in even those cases.
>
> On 11 Feb 2017, at 21:40, Paul Tremblay <paulhtremblay@gmail.com 
> <ma...@gmail.com>> wrote:
>
>> I've been working on this problem for several days (I am doing more 
>> to increase my knowledge of Spark). The code you linked to hangs 
>> because after reading in the file, I have to gunzip it.
>>
>> Another way that seems to be working is reading each file in using 
>> sc.textFile, and then writing it the HDFS, and then using 
>> wholeTextFiles for the HDFS result.
>>
>> But the bigger issue is that both methods are not executed in 
>> parallel. When I open my yarn manager, it shows that only one node is 
>> being used.
>>
>>
>> Henry
>>
>>
>> On 02/06/2017 03:39 PM, Jon Gregg wrote:
>>> Strange that it's working for some directories but not others.  
>>> Looks like wholeTextFiles maybe doesn't work with S3? 
>>> https://issues.apache.org/jira/browse/SPARK-4414 .
>>>
>>> If it's possible to load the data into EMR and run Spark from there 
>>> that may be a workaround.  This blogspot shows a python workaround 
>>> that might work as well: 
>>> http://michaelryanbell.com/processing-whole-files-spark-s3.html
>>>
>>> Jon
>>>
>>>
>>> On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay 
>>> <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>>>
>>>     I've actually been able to trace the problem to the files being
>>>     read in. If I change to a different directory, then I don't get
>>>     the error. Is one of the executors running out of memory?
>>>
>>>
>>>
>>>
>>>
>>>     On 02/06/2017 02:35 PM, Paul Tremblay wrote:
>>>
>>>         When I try to create an rdd using wholeTextFiles, I get an
>>>         incomprehensible error. But when I use the same path with
>>>         sc.textFile, I get no error.
>>>
>>>         I am using pyspark with spark 2.1.
>>>
>>>         in_path =
>>>         's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
>>>
>>>         rdd = sc.wholeTextFiles(in_path)
>>>
>>>         rdd.take(1)
>>>
>>>
>>>         /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>>>            1341
>>>            1342             p = range(partsScanned, min(partsScanned
>>>         + numPartsToTry, totalParts))
>>>         -> 1343             res = self.context.runJob(self,
>>>         takeUpToNumLeft, p)
>>>            1344
>>>            1345             items += res
>>>
>>>         /usr/lib/spark/python/pyspark/context.py in runJob(self,
>>>         rdd, partitionFunc, partitions, allowLocal)
>>>             963         # SparkContext#runJob.
>>>             964         mappedRDD = rdd.mapPartitions(partitionFunc)
>>>         --> 965         port =
>>>         self._jvm.PythonRDD.runJob(self._jsc.sc <http://jsc.sc>(),
>>>         mappedRDD._jrdd, partitions)
>>>             966         return list(_load_from_socket(port,
>>>         mappedRDD._jrdd_deserializer))
>>>             967
>>>
>>>         /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
>>>         in __call__(self, *args)
>>>            1131         answer =
>>>         self.gateway_client.send_command(command)
>>>            1132         return_value = get_return_value(
>>>         -> 1133             answer, self.gateway_client,
>>>         self.target_id, self.name <http://self.name>)
>>>            1134
>>>            1135         for temp_arg in temp_args:
>>>
>>>         /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>>              61     def deco(*a, **kw):
>>>              62         try:
>>>         ---> 63             return f(*a, **kw)
>>>              64         except py4j.protocol.Py4JJavaError as e:
>>>              65             s = e.java_exception.toString()
>>>
>>>         /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
>>>         in get_return_value(answer, gateway_client, target_id, name)
>>>             317                 raise Py4JJavaError(
>>>             318                     "An error occurred while calling
>>>         {0}{1}{2}.\n".
>>>         --> 319                     format(target_id, ".", name), value)
>>>             320             else:
>>>             321                 raise Py4JError(
>>>
>>>         Py4JJavaError: An error occurred while calling
>>>         z:org.apache.spark.api.python.PythonRDD.runJob.
>>>         : org.apache.spark.SparkException: Job aborted due to stage
>>>         failure: Task 0 in stage 1.0 failed 4 times, most recent
>>>         failure: Lost task 0.3 in stage 1.0 (TID 7,
>>>         ip-172-31-45-114.us-west-2.com
>>>         <http://ip-172-31-45-114.us-west-2.com>pute.internal,
>>>         executor 8): ExecutorLostFailure (executor 8 exited caused
>>>         by one of the running tasks) Reason: Container marked as
>>>         failed: container_1486415078210_0005_01_000016 on host:
>>>         ip-172-31-45-114.us-west-2.com
>>>         <http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit
>>>         status: 52. Diagnostics: Exception from container-launch.
>>>         Container id: container_1486415078210_0005_01_000016
>>>         Exit code: 52
>>>         Stack trace: ExitCodeException exitCode=52:
>>>             at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>>>             at org.apache.hadoop.util.Shell.run(Shell.java:479)
>>>             at
>>>         org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>>>             at
>>>         org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>>>             at
>>>         org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>>>             at
>>>         org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>>>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>             at
>>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>             at
>>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>             at java.lang.Thread.run(Thread.java:745)
>>>
>>>         rdd = sc.textFile(in_path)
>>>
>>>         In [8]: rdd.take(1)
>>>         Out[8]: [u'WARC/1.0']
>>>
>>>
>>>
>>>     ---------------------------------------------------------------------
>>>     To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>     <ma...@spark.apache.org>
>>>
>>>
>>

-- 
Henry Tremblay
Robert Half Technology


Re: wholeTextFiles fails, but textFile succeeds for same path

Posted by Jörn Franke <jo...@gmail.com>.
Can you post more information about the number of files, their size and the executor logs.

A gzipped file is not splittable i.e. Only one executor can gunzip it (the unzipped data can then be processed in parallel). 
Wholetextfile was designed to be executed only on one executor (e.g. For processing xmls which are difficult to process in parallel).

Then, if you have small files (< HDFS blocksize) they are also only processed on one executor by default.

You may repartition though for parallel processing in even those cases.

> On 11 Feb 2017, at 21:40, Paul Tremblay <pa...@gmail.com> wrote:
> 
> I've been working on this problem for several days (I am doing more to increase my knowledge of Spark). The code you linked to hangs because after reading in the file, I have to gunzip it. 
> Another way that seems to be working is reading each file in using sc.textFile, and then writing it the HDFS, and then using wholeTextFiles for the HDFS result. 
> But the bigger issue is that both methods are not executed in parallel. When I open my yarn manager, it shows that only one node is being used. 
> 
> Henry
> 
>> On 02/06/2017 03:39 PM, Jon Gregg wrote:
>> Strange that it's working for some directories but not others.  Looks like wholeTextFiles maybe doesn't work with S3?  https://issues.apache.org/jira/browse/SPARK-4414 .  
>> 
>> If it's possible to load the data into EMR and run Spark from there that may be a workaround.  This blogspot shows a python workaround that might work as well: http://michaelryanbell.com/processing-whole-files-spark-s3.html
>> 
>> Jon
>> 
>> 
>> On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <pa...@gmail.com> wrote:
>>> I've actually been able to trace the problem to the files being read in. If I change to a different directory, then I don't get the error. Is one of the executors running out of memory?
>>> 
>>> 
>>> 
>>> 
>>> 
>>>> On 02/06/2017 02:35 PM, Paul Tremblay wrote:
>>>> When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error.
>>>> 
>>>> I am using pyspark with spark 2.1.
>>>> 
>>>> in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
>>>> 
>>>> rdd = sc.wholeTextFiles(in_path)
>>>> 
>>>> rdd.take(1)
>>>> 
>>>> 
>>>> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>>>>    1341
>>>>    1342             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
>>>> -> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
>>>>    1344
>>>>    1345             items += res
>>>> 
>>>> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
>>>>     963         # SparkContext#runJob.
>>>>     964         mappedRDD = rdd.mapPartitions(partitionFunc)
>>>> --> 965         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>>>>     966         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
>>>>     967
>>>> 
>>>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
>>>>    1131         answer = self.gateway_client.send_command(command)
>>>>    1132         return_value = get_return_value(
>>>> -> 1133             answer, self.gateway_client, self.target_id, self.name)
>>>>    1134
>>>>    1135         for temp_arg in temp_args:
>>>> 
>>>> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>>>      61     def deco(*a, **kw):
>>>>      62         try:
>>>> ---> 63             return f(*a, **kw)
>>>>      64         except py4j.protocol.Py4JJavaError as e:
>>>>      65             s = e.java_exception.toString()
>>>> 
>>>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
>>>>     317                 raise Py4JJavaError(
>>>>     318                     "An error occurred while calling {0}{1}{2}.\n".
>>>> --> 319                     format(target_id, ".", name), value)
>>>>     320             else:
>>>>     321                 raise Py4JError(
>>>> 
>>>> Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_000016 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. Diagnostics: Exception from container-launch.
>>>> Container id: container_1486415078210_0005_01_000016
>>>> Exit code: 52
>>>> Stack trace: ExitCodeException exitCode=52:
>>>>     at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>>>>     at org.apache.hadoop.util.Shell.run(Shell.java:479)
>>>>     at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>>>>     at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>>>>     at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>>>>     at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> rdd = sc.textFile(in_path)
>>>> 
>>>> In [8]: rdd.take(1)
>>>> Out[8]: [u'WARC/1.0']
>>>> 
>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>> 
>> 
> 

Re: wholeTextFiles fails, but textFile succeeds for same path

Posted by Paul Tremblay <pa...@gmail.com>.
I've been working on this problem for several days (I am doing more to 
increase my knowledge of Spark). The code you linked to hangs because 
after reading in the file, I have to gunzip it.

Another way that seems to be working is reading each file in using 
sc.textFile, and then writing it the HDFS, and then using wholeTextFiles 
for the HDFS result.

But the bigger issue is that both methods are not executed in parallel. 
When I open my yarn manager, it shows that only one node is being used.


Henry


On 02/06/2017 03:39 PM, Jon Gregg wrote:
> Strange that it's working for some directories but not others.  Looks 
> like wholeTextFiles maybe doesn't work with S3? 
> https://issues.apache.org/jira/browse/SPARK-4414 .
>
> If it's possible to load the data into EMR and run Spark from there 
> that may be a workaround.  This blogspot shows a python workaround 
> that might work as well: 
> http://michaelryanbell.com/processing-whole-files-spark-s3.html
>
> Jon
>
>
> On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <paulhtremblay@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     I've actually been able to trace the problem to the files being
>     read in. If I change to a different directory, then I don't get
>     the error. Is one of the executors running out of memory?
>
>
>
>
>
>     On 02/06/2017 02:35 PM, Paul Tremblay wrote:
>
>         When I try to create an rdd using wholeTextFiles, I get an
>         incomprehensible error. But when I use the same path with
>         sc.textFile, I get no error.
>
>         I am using pyspark with spark 2.1.
>
>         in_path =
>         's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
>
>         rdd = sc.wholeTextFiles(in_path)
>
>         rdd.take(1)
>
>
>         /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>            1341
>            1342             p = range(partsScanned, min(partsScanned +
>         numPartsToTry, totalParts))
>         -> 1343             res = self.context.runJob(self,
>         takeUpToNumLeft, p)
>            1344
>            1345             items += res
>
>         /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
>         partitionFunc, partitions, allowLocal)
>             963         # SparkContext#runJob.
>             964         mappedRDD = rdd.mapPartitions(partitionFunc)
>         --> 965         port = self._jvm.PythonRDD.runJob(self._jsc.sc
>         <http://jsc.sc>(), mappedRDD._jrdd, partitions)
>             966         return list(_load_from_socket(port,
>         mappedRDD._jrdd_deserializer))
>             967
>
>         /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
>         in __call__(self, *args)
>            1131         answer = self.gateway_client.send_command(command)
>            1132         return_value = get_return_value(
>         -> 1133             answer, self.gateway_client,
>         self.target_id, self.name <http://self.name>)
>            1134
>            1135         for temp_arg in temp_args:
>
>         /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>              61     def deco(*a, **kw):
>              62         try:
>         ---> 63             return f(*a, **kw)
>              64         except py4j.protocol.Py4JJavaError as e:
>              65             s = e.java_exception.toString()
>
>         /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
>         in get_return_value(answer, gateway_client, target_id, name)
>             317                 raise Py4JJavaError(
>             318                     "An error occurred while calling
>         {0}{1}{2}.\n".
>         --> 319                     format(target_id, ".", name), value)
>             320             else:
>             321                 raise Py4JError(
>
>         Py4JJavaError: An error occurred while calling
>         z:org.apache.spark.api.python.PythonRDD.runJob.
>         : org.apache.spark.SparkException: Job aborted due to stage
>         failure: Task 0 in stage 1.0 failed 4 times, most recent
>         failure: Lost task 0.3 in stage 1.0 (TID 7,
>         ip-172-31-45-114.us-west-2.com
>         <http://ip-172-31-45-114.us-west-2.com>pute.internal, executor
>         8): ExecutorLostFailure (executor 8 exited caused by one of
>         the running tasks) Reason: Container marked as failed:
>         container_1486415078210_0005_01_000016 on host:
>         ip-172-31-45-114.us-west-2.com
>         <http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit
>         status: 52. Diagnostics: Exception from container-launch.
>         Container id: container_1486415078210_0005_01_000016
>         Exit code: 52
>         Stack trace: ExitCodeException exitCode=52:
>             at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>             at org.apache.hadoop.util.Shell.run(Shell.java:479)
>             at
>         org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>             at
>         org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>             at
>         org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>             at
>         org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>             at
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>             at java.lang.Thread.run(Thread.java:745)
>
>         rdd = sc.textFile(in_path)
>
>         In [8]: rdd.take(1)
>         Out[8]: [u'WARC/1.0']
>
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>
>


Re: wholeTextFiles fails, but textFile succeeds for same path

Posted by Jon Gregg <co...@gmail.com>.
Strange that it's working for some directories but not others.  Looks like
wholeTextFiles maybe doesn't work with S3?
https://issues.apache.org/jira/browse/SPARK-4414 .

If it's possible to load the data into EMR and run Spark from there that
may be a workaround.  This blogspot shows a python workaround that might
work as well:
http://michaelryanbell.com/processing-whole-files-spark-s3.html

Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <pa...@gmail.com>
wrote:

> I've actually been able to trace the problem to the files being read in.
> If I change to a different directory, then I don't get the error. Is one of
> the executors running out of memory?
>
>
>
>
>
> On 02/06/2017 02:35 PM, Paul Tremblay wrote:
>
>> When I try to create an rdd using wholeTextFiles, I get an
>> incomprehensible error. But when I use the same path with sc.textFile, I
>> get no error.
>>
>> I am using pyspark with spark 2.1.
>>
>> in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/148069
>> 8542939.6/warc/
>>
>> rdd = sc.wholeTextFiles(in_path)
>>
>> rdd.take(1)
>>
>>
>> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>>    1341
>>    1342             p = range(partsScanned, min(partsScanned +
>> numPartsToTry, totalParts))
>> -> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
>>    1344
>>    1345             items += res
>>
>> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
>> partitionFunc, partitions, allowLocal)
>>     963         # SparkContext#runJob.
>>     964         mappedRDD = rdd.mapPartitions(partitionFunc)
>> --> 965         port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
>> mappedRDD._jrdd, partitions)
>>     966         return list(_load_from_socket(port,
>> mappedRDD._jrdd_deserializer))
>>     967
>>
>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in
>> __call__(self, *args)
>>    1131         answer = self.gateway_client.send_command(command)
>>    1132         return_value = get_return_value(
>> -> 1133             answer, self.gateway_client, self.target_id,
>> self.name)
>>    1134
>>    1135         for temp_arg in temp_args:
>>
>> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>      61     def deco(*a, **kw):
>>      62         try:
>> ---> 63             return f(*a, **kw)
>>      64         except py4j.protocol.Py4JJavaError as e:
>>      65             s = e.java_exception.toString()
>>
>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)
>>     317                 raise Py4JJavaError(
>>     318                     "An error occurred while calling
>> {0}{1}{2}.\n".
>> --> 319                     format(target_id, ".", name), value)
>>     320             else:
>>     321                 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor
>> 8): ExecutorLostFailure (executor 8 exited caused by one of the running
>> tasks) Reason: Container marked as failed: container_1486415078210_0005_01_000016
>> on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52.
>> Diagnostics: Exception from container-launch.
>> Container id: container_1486415078210_0005_01_000016
>> Exit code: 52
>> Stack trace: ExitCodeException exitCode=52:
>>     at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>>     at org.apache.hadoop.util.Shell.run(Shell.java:479)
>>     at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Sh
>> ell.java:773)
>>     at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerEx
>> ecutor.launchContainer(DefaultContainerExecutor.java:212)
>>     at org.apache.hadoop.yarn.server.nodemanager.containermanager.l
>> auncher.ContainerLaunch.call(ContainerLaunch.java:302)
>>     at org.apache.hadoop.yarn.server.nodemanager.containermanager.l
>> auncher.ContainerLaunch.call(ContainerLaunch.java:82)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> rdd = sc.textFile(in_path)
>>
>> In [8]: rdd.take(1)
>> Out[8]: [u'WARC/1.0']
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: wholeTextFiles fails, but textFile succeeds for same path

Posted by Paul Tremblay <pa...@gmail.com>.
I've actually been able to trace the problem to the files being read in. 
If I change to a different directory, then I don't get the error. Is one 
of the executors running out of memory?




On 02/06/2017 02:35 PM, Paul Tremblay wrote:
> When I try to create an rdd using wholeTextFiles, I get an 
> incomprehensible error. But when I use the same path with sc.textFile, 
> I get no error.
>
> I am using pyspark with spark 2.1.
>
> in_path = 
> 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
>
> rdd = sc.wholeTextFiles(in_path)
>
> rdd.take(1)
>
>
> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>    1341
>    1342             p = range(partsScanned, min(partsScanned + 
> numPartsToTry, totalParts))
> -> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
>    1344
>    1345             items += res
>
> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
> partitionFunc, partitions, allowLocal)
>     963         # SparkContext#runJob.
>     964         mappedRDD = rdd.mapPartitions(partitionFunc)
> --> 965         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
> mappedRDD._jrdd, partitions)
>     966         return list(_load_from_socket(port, 
> mappedRDD._jrdd_deserializer))
>     967
>
> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)
>    1131         answer = self.gateway_client.send_command(command)
>    1132         return_value = get_return_value(
> -> 1133             answer, self.gateway_client, self.target_id, 
> self.name)
>    1134
>    1135         for temp_arg in temp_args:
>
> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>      61     def deco(*a, **kw):
>      62         try:
> ---> 63             return f(*a, **kw)
>      64         except py4j.protocol.Py4JJavaError as e:
>      65             s = e.java_exception.toString()
>
> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
>     317                 raise Py4JJavaError(
>     318                     "An error occurred while calling 
> {0}{1}{2}.\n".
> --> 319                     format(target_id, ".", name), value)
>     320             else:
>     321                 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
> in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
> executor 8): ExecutorLostFailure (executor 8 exited caused by one of 
> the running tasks) Reason: Container marked as failed: 
> container_1486415078210_0005_01_000016 on host: 
> ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
> Diagnostics: Exception from container-launch.
> Container id: container_1486415078210_0005_01_000016
> Exit code: 52
> Stack trace: ExitCodeException exitCode=52:
>     at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>     at org.apache.hadoop.util.Shell.run(Shell.java:479)
>     at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>     at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>     at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>     at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> rdd = sc.textFile(in_path)
>
> In [8]: rdd.take(1)
> Out[8]: [u'WARC/1.0']
>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org