You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2015/06/11 10:01:02 UTC
[jira] [Resolved] (SPARK-6411) PySpark DataFrames can't be created
if any datetimes have timezones
[ https://issues.apache.org/jira/browse/SPARK-6411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Reynold Xin resolved SPARK-6411.
--------------------------------
Resolution: Fixed
Fix Version/s: 1.5.0
> PySpark DataFrames can't be created if any datetimes have timezones
> -------------------------------------------------------------------
>
> Key: SPARK-6411
> URL: https://issues.apache.org/jira/browse/SPARK-6411
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 1.3.0
> Reporter: Harry Brundage
> Assignee: Davies Liu
> Fix For: 1.5.0
>
>
> I am unable to create a DataFrame with PySpark if any of the {{datetime}} objects that pass through the conversion process have a {{tzinfo}} property set.
> This works fine:
> {code}
> In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10),)]).toDF().collect()
> Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
> {code}
> as expected, the tuple's schema is inferred as having one anonymous column with a datetime field, and the datetime roundtrips through to the Java side python deserialization and then back into python land upon {{collect}}. This however:
> {code}
> In [5]: from dateutil.tz import tzutc
> In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()),)]).toDF().collect()
> {code}
> explodes with
> {code}
> Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data for datetime; expected 1 or 7 args, got 2
> at net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
> at net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
> at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
> at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
> at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> By the looks of the error, it would appear as though the java depickler isn't expecting the pickle stream to provide that extra timezone constructor argument.
> Here's the disassembled pickle stream for a timezone-less datetime:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10)
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
> 0: c GLOBAL 'datetime datetime'
> 19: p PUT 0
> 22: ( MARK
> 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
> 65: p PUT 1
> 68: t TUPLE (MARK at 22)
> 69: p PUT 2
> 72: R REDUCE
> 73: p PUT 3
> 76: . STOP
> highest protocol among opcodes = 0
> {code}
> and then for one with a timezone:
> {code}
> >>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
> >>> stream = pickle.dumps(object)
> >>> pickletools.dis(stream)
> 0: c GLOBAL 'datetime datetime'
> 19: p PUT 0
> 22: ( MARK
> 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
> 65: p PUT 1
> 68: c GLOBAL 'copy_reg _reconstructor'
> 93: p PUT 2
> 96: ( MARK
> 97: c GLOBAL 'dateutil.tz tzutc'
> 116: p PUT 3
> 119: c GLOBAL 'datetime tzinfo'
> 136: p PUT 4
> 139: g GET 4
> 142: ( MARK
> 143: t TUPLE (MARK at 142)
> 144: R REDUCE
> 145: p PUT 5
> 148: t TUPLE (MARK at 96)
> 149: p PUT 6
> 152: R REDUCE
> 153: p PUT 7
> 156: t TUPLE (MARK at 22)
> 157: p PUT 8
> 160: R REDUCE
> 161: p PUT 9
> 164: . STOP
> highest protocol among opcodes = 0
> {code}
> I would bet that the Pyrolite library is missing support for that nested object as a second tuple member in the reconstruction of the datetime object. Has anyone hit this before? Any more information I can provide?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org