You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/03 08:27:00 UTC

[jira] [Commented] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

    [ https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832342#comment-16832342 ] 

Hyukjin Kwon commented on SPARK-27613:
--------------------------------------

Can you show expected input/output? It's hard to read and follow

> Caching an RDD composed of Row Objects produces some kind of key recombination
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-27613
>                 URL: https://issues.apache.org/jira/browse/SPARK-27613
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Andres Fernandez
>            Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will then go ahead and iterate over the table names to create dataframes filtering the RDDs by the first element and valid response.
> So far so good.
> _QueryResponse_ object (from azure.loganalytics package) contains, essentialy, a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. Every single response (fifth element of the tuple [4]) for the same table name (first element of the tuple [0]) has exactly the same columns in the same order (order is not important other thant to reference the rows data inside the same response anyways). The types are stored in *column_types* taking the first response as the sample.
> Now to the tricky part.
> flatMap is called on the *responses_rdd* with the function +"tabularize_response_rdd+" which basically creates a Row object for every row (_list_ of _str_) in the _QueryResponse_. A schema is created based on a *type_map* from azure types to spark.sql.types in order to specify it to the subsequent createDataFrame instruction. If the result of this flatMap, *table_tabular_rdd*, is **not** cached before creating the DataFrame from the Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked dict the code in [here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375] sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / documented solution I am not following? Im just a beginner when it comes to Spark and would happily accept any suggestion. I hope I was clear enough, and I am open to give you any additional details that might be helpful. Thank you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that the contents do not correspond to the key. *record_count* key is actually a Long but in the Row it got somehow swapped for another key's value, in this case 'n/a'.
> {code:python}
> def create_dataframes_from_azure_responses_rdd(table_names: list, responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
>     import pyspark
>     tn, wsid, count, interval, response = x
>     ret = []
>     if response.tables[0].rows:
>       ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
>     return ret
>   data_frames = {}
>   for tn in table_names:
>     if verbose: print("Filtering RDD items for {}".format(tn))
>     table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and not x[4] is None).cache()
>     
>     data_frames[tn] = None
>     if verbose: print("Checking if RDD for {} has data".format(tn))
>     if not table_response_rdd.isEmpty():
>       if verbose: print("Getting column types for {} from azure response".format(tn))
>       column_types = {f.name:f.type for f in table_response_rdd.take(1)[0][4].tables[0].columns}
>       column_types[ws_column_name] = "string"
>       if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>       table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) #.cache() #Error with cache, no error without!
>       if verbose: print("Getting sample row for {}".format(tn))
>       row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>       if verbose: print("Building schema for {} from sample row and column types".format(tn))
>       current_schema = StructType([StructField(f, type_map[column_types[f]](), True) for f in row_fields])
>       if verbose: print("Creating dataframe for {}".format(tn))
>       table_df = spark.createDataFrame(table_tabular_rdd, schema=current_schema).cache()
>       if verbose: print("Calculating expected count for {}".format(tn))
>       expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
>       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
>       table_response_rdd.unpersist()
>       #table_tabular_rdd.unpersist()
>       if verbose: print("Expected count {} vs Real count {}".format(expected_count, real_count))
>       data_frames[tn]=table_df
>     else:
>       if verbose: print("{} table was empty!".format(tn))
>   return data_frames
> {code}
> {noformat}
> Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in <module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)       4 resrdds.unpersist() <command-2824384765475774> in create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose)      37 if verbose: print("Calculating expected count for {}".format(tn))      38 expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]      40 table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() /databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # Default path used in OSS Spark / for non-DF-ACL clusters:     547 with SCCallSiteSync(self._sc) as css: --> 548             sock_info = self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))     550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)    1255 answer = self.gateway_client.send_command(command)    1256 return_value = get_return_value( -> 1257             answer, self.gateway_client, self.target_id, self.name)
>    1258    1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 except py4j.protocol.Py4JJavaError as e:      65 s = e.java_exception.toString() /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)     326 raise Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328                     format(target_id, ".", name), value)
>     329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org