You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Praveen Sripati <pr...@gmail.com> on 2014/09/22 14:39:15 UTC

Error while calculating the max temperature

Hi,

I am writing a Spark program in Python to find the maximum temperature for
a year, given a weather dataset. The below program throws an error when I
try to execute the Spark program.

TypeError: 'NoneType' object is not iterable


org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)

org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

If I replace +9999 with 9999 in the extractData function, the program
executes without any error. The code with +9999 works in the Hadoop
streaming, but not with Spark pipes. How to get around the problem? Has to
do it with the way encoding is done within Spark?

-------------

import re
import sys

from pyspark import SparkContext

#Create Spark Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")

#define the accumulator
invalidRecordsAccumulator = sc.accumulator(0)
validRecordsAccumulator = sc.accumulator(0)

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)

#function to extract the data from the line
#based on position and filter out the invalid records









*def extractData(line):    global invalidRecordsAccumulator    val =
line.strip()    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
validRecordsAccumulator += 1    return (year, temp)    else:
invalidRecordsAccumulator += 1*


#Transform the data to extract/filter and then find the max temperature
max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
a,b : a if int(a) > int(b) else b)

#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value

Thanks,
Praveen

Re: Error while calculating the max temperature

Posted by Praveen Sripati <pr...@gmail.com>.
Hi Sean,

Thanks for the response. I changed from map to flatMap and in the function
return a list as below

if (temp != "+9999" and re.match("[01459]", q)):
    return [(year,temp)]
else:
    return []

Thanks,
Praveen

On Mon, Sep 22, 2014 at 9:26 PM, Sean Owen <so...@cloudera.com> wrote:

> If your map() sometimes does not emit an element, then you need to
> call flatMap() instead, and emit Some(value) (or any collection of
> values) if there is an element to return, or None otherwise.
>
> On Mon, Sep 22, 2014 at 4:50 PM, Praveen Sripati
> <pr...@gmail.com> wrote:
> > During the map based on some conditions if some of the rows are ignored
> > (without any transformation) then then there is a record by None in the
> > output RDD for the ignored records. And reduceByKey is not able to handle
> > this type of None record and so the exception. I tried filter, but it is
> > also not able to handle the None record as input.
> >
> > How to get around this?
> >
> > Thanks,
> > Praveen
> >
> > On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <
> praveensripati@gmail.com>
> > wrote:
> >>
> >> Hi,
> >>
> >> I am writing a Spark program in Python to find the maximum temperature
> for
> >> a year, given a weather dataset. The below program throws an error when
> I
> >> try to execute the Spark program.
> >>
> >> TypeError: 'NoneType' object is not iterable
> >>
> >>
> >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
> >>
> >>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
> >>
>  org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
> >>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>
> >> If I replace +9999 with 9999 in the extractData function, the program
> >> executes without any error. The code with +9999 works in the Hadoop
> >> streaming, but not with Spark pipes. How to get around the problem? Has
> to
> >> do it with the way encoding is done within Spark?
> >>
> >> -------------
> >>
> >> import re
> >> import sys
> >>
> >> from pyspark import SparkContext
> >>
> >> #Create Spark Context with the master details and the application name
> >> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
> >>
> >> #define the accumulator
> >> invalidRecordsAccumulator = sc.accumulator(0)
> >> validRecordsAccumulator = sc.accumulator(0)
> >>
> >> logFile = "hdfs://localhost:9000/user/bigdatavm/input"
> >>
> >> #Create an RDD from the input data in HDFS
> >> weatherData = sc.textFile(logFile)
> >>
> >> #function to extract the data from the line
> >> #based on position and filter out the invalid records
> >> def extractData(line):
> >>
> >>     global invalidRecordsAccumulator
> >>     val = line.strip()
> >>     (year, temp, q) = (val[15:19], val[87:92], val[92:93])
> >>     if (temp != "+9999" and re.match("[01459]", q)):
> >>         validRecordsAccumulator += 1
> >>     return (year, temp)
> >>     else:
> >>        invalidRecordsAccumulator += 1
> >>
> >>
> >> #Transform the data to extract/filter and then find the max temperature
> >> max_temperature_per_year =
> weatherData.map(extractData).reduceByKey(lambda
> >> a,b : a if int(a) > int(b) else b)
> >>
> >> #Save the RDD back into HDFS
> >>
> >>
> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
> >>
> >> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
> >> print "----->>>>>> Invalid records = %d" %
> invalidRecordsAccumulator.value
> >>
> >> Thanks,
> >> Praveen
> >
> >
>

Re: Error while calculating the max temperature

Posted by Sean Owen <so...@cloudera.com>.
If your map() sometimes does not emit an element, then you need to
call flatMap() instead, and emit Some(value) (or any collection of
values) if there is an element to return, or None otherwise.

On Mon, Sep 22, 2014 at 4:50 PM, Praveen Sripati
<pr...@gmail.com> wrote:
> During the map based on some conditions if some of the rows are ignored
> (without any transformation) then then there is a record by None in the
> output RDD for the ignored records. And reduceByKey is not able to handle
> this type of None record and so the exception. I tried filter, but it is
> also not able to handle the None record as input.
>
> How to get around this?
>
> Thanks,
> Praveen
>
> On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <pr...@gmail.com>
> wrote:
>>
>> Hi,
>>
>> I am writing a Spark program in Python to find the maximum temperature for
>> a year, given a weather dataset. The below program throws an error when I
>> try to execute the Spark program.
>>
>> TypeError: 'NoneType' object is not iterable
>>
>>
>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>>
>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>
>> If I replace +9999 with 9999 in the extractData function, the program
>> executes without any error. The code with +9999 works in the Hadoop
>> streaming, but not with Spark pipes. How to get around the problem? Has to
>> do it with the way encoding is done within Spark?
>>
>> -------------
>>
>> import re
>> import sys
>>
>> from pyspark import SparkContext
>>
>> #Create Spark Context with the master details and the application name
>> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
>>
>> #define the accumulator
>> invalidRecordsAccumulator = sc.accumulator(0)
>> validRecordsAccumulator = sc.accumulator(0)
>>
>> logFile = "hdfs://localhost:9000/user/bigdatavm/input"
>>
>> #Create an RDD from the input data in HDFS
>> weatherData = sc.textFile(logFile)
>>
>> #function to extract the data from the line
>> #based on position and filter out the invalid records
>> def extractData(line):
>>
>>     global invalidRecordsAccumulator
>>     val = line.strip()
>>     (year, temp, q) = (val[15:19], val[87:92], val[92:93])
>>     if (temp != "+9999" and re.match("[01459]", q)):
>>         validRecordsAccumulator += 1
>>     return (year, temp)
>>     else:
>>        invalidRecordsAccumulator += 1
>>
>>
>> #Transform the data to extract/filter and then find the max temperature
>> max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
>> a,b : a if int(a) > int(b) else b)
>>
>> #Save the RDD back into HDFS
>>
>> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
>>
>> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
>> print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value
>>
>> Thanks,
>> Praveen
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Error while calculating the max temperature

Posted by Praveen Sripati <pr...@gmail.com>.
During the map based on some conditions if some of the rows are ignored
(without any transformation) then then there is a record by None in the
output RDD for the ignored records. And reduceByKey is not able to handle
this type of None record and so the exception. I tried filter, but it is
also not able to handle the None record as input.

How to get around this?

Thanks,
Praveen

On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <pr...@gmail.com>
wrote:

> Hi,
>
> I am writing a Spark program in Python to find the maximum temperature for
> a year, given a weather dataset. The below program throws an error when I
> try to execute the Spark program.
>
> TypeError: 'NoneType' object is not iterable
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>
> If I replace +9999 with 9999 in the extractData function, the program
> executes without any error. The code with +9999 works in the Hadoop
> streaming, but not with Spark pipes. How to get around the problem? Has to
> do it with the way encoding is done within Spark?
>
> -------------
>
> import re
> import sys
>
> from pyspark import SparkContext
>
> #Create Spark Context with the master details and the application name
> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
>
> #define the accumulator
> invalidRecordsAccumulator = sc.accumulator(0)
> validRecordsAccumulator = sc.accumulator(0)
>
> logFile = "hdfs://localhost:9000/user/bigdatavm/input"
>
> #Create an RDD from the input data in HDFS
> weatherData = sc.textFile(logFile)
>
> #function to extract the data from the line
> #based on position and filter out the invalid records
>
>
>
>
>
>
>
>
>
> *def extractData(line):    global invalidRecordsAccumulator    val =
> line.strip()    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
> if (temp != "+9999" and re.match("[01459]", q)):
> validRecordsAccumulator += 1    return (year, temp)    else:
> invalidRecordsAccumulator += 1*
>
>
> #Transform the data to extract/filter and then find the max temperature
> max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
> a,b : a if int(a) > int(b) else b)
>
> #Save the RDD back into HDFS
>
> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
>
> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
> print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value
>
> Thanks,
> Praveen
>