You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2015/06/11 10:01:02 UTC

[jira] [Resolved] (SPARK-6411) PySpark DataFrames can't be created if any datetimes have timezones

     [ https://issues.apache.org/jira/browse/SPARK-6411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Reynold Xin resolved SPARK-6411.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 1.5.0

> PySpark DataFrames can't be created if any datetimes have timezones
> -------------------------------------------------------------------
>
>                 Key: SPARK-6411
>                 URL: https://issues.apache.org/jira/browse/SPARK-6411
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 1.3.0
>            Reporter: Harry Brundage
>            Assignee: Davies Liu
>             Fix For: 1.5.0
>
>
> I am unable to create a DataFrame with PySpark if any of the {{datetime}} objects that pass through the conversion process have a {{tzinfo}} property set. 
> This works fine:
> {code}
> In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10),)]).toDF().collect()
> Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
> {code}
> as expected, the tuple's schema is inferred as having one anonymous column with a datetime field, and the datetime roundtrips through to the Java side python deserialization and then back into python land upon {{collect}}. This however:
> {code}
> In [5]: from dateutil.tz import tzutc
> In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()),)]).toDF().collect()
> {code}
> explodes with
> {code}
> Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data for datetime; expected 1 or 7 args, got 2
> 	at net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
> 	at net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
> 	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
> 	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
> 	at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
> 	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
> 	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
> 	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
> 	at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> 	at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> By the looks of the error, it would appear as though the java depickler isn't expecting the pickle stream to provide that extra timezone constructor argument.
> Here's the disassembled pickle stream for a timezone-less datetime:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10)
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
>     0: c    GLOBAL     'datetime datetime'
>    19: p    PUT        0
>    22: (    MARK
>    23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
>    65: p        PUT        1
>    68: t        TUPLE      (MARK at 22)
>    69: p    PUT        2
>    72: R    REDUCE
>    73: p    PUT        3
>    76: .    STOP
> highest protocol among opcodes = 0
> {code}
> and then for one with a timezone:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
>     0: c    GLOBAL     'datetime datetime'
>    19: p    PUT        0
>    22: (    MARK
>    23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
>    65: p        PUT        1
>    68: c        GLOBAL     'copy_reg _reconstructor'
>    93: p        PUT        2
>    96: (        MARK
>    97: c            GLOBAL     'dateutil.tz tzutc'
>   116: p            PUT        3
>   119: c            GLOBAL     'datetime tzinfo'
>   136: p            PUT        4
>   139: g            GET        4
>   142: (            MARK
>   143: t                TUPLE      (MARK at 142)
>   144: R            REDUCE
>   145: p            PUT        5
>   148: t            TUPLE      (MARK at 96)
>   149: p        PUT        6
>   152: R        REDUCE
>   153: p        PUT        7
>   156: t        TUPLE      (MARK at 22)
>   157: p    PUT        8
>   160: R    REDUCE
>   161: p    PUT        9
>   164: .    STOP
> highest protocol among opcodes = 0
> {code}
> I would bet that the Pyrolite library is missing support for that nested object as a second tuple member in the reconstruction of the datetime object. Has anyone hit this before? Any more information I can provide?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org