You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2023/02/14 03:56:00 UTC

[jira] [Resolved] (SPARK-41999) NPE for bucketed write (ReadwriterTests.test_bucketed_write)

     [ https://issues.apache.org/jira/browse/SPARK-41999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-41999.
----------------------------------
    Fix Version/s: 3.4.0
       Resolution: Fixed

Issue resolved by pull request 40002
[https://github.com/apache/spark/pull/40002]

> NPE for bucketed write (ReadwriterTests.test_bucketed_write)
> ------------------------------------------------------------
>
>                 Key: SPARK-41999
>                 URL: https://issues.apache.org/jira/browse/SPARK-41999
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Connect
>    Affects Versions: 3.4.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>             Fix For: 3.4.0
>
>
> {code}
> java.util.NoSuchElementException
> 	at java.util.AbstractList$Itr.next(AbstractList.java:364)
> 	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
> 	at scala.collection.IterableLike.head(IterableLike.scala:109)
> 	at scala.collection.IterableLike.head$(IterableLike.scala:108)
> 	at scala.collection.AbstractIterable.head(Iterable.scala:56)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
> 	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
> 	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
> 	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
> 	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
> 	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> 	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
> 	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
> 	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing runnable org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
> java.lang.NullPointerException
> 	at org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
> 	at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
> 	at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
> 	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
> 	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
> 	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> 	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
> 	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
> 	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> pyspark/sql/tests/test_readwriter.py:102 (ReadwriterParityTests.test_bucketed_write)
> self = <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests testMethod=test_bucketed_write>
>     def test_bucketed_write(self):
>         data = [
>             (1, "foo", 3.0),
>             (2, "foo", 5.0),
>             (3, "bar", -1.0),
>             (4, "bar", 6.0),
>         ]
>         df = self.spark.createDataFrame(data, ["x", "y", "z"])
>     
>         def count_bucketed_cols(names, table="pyspark_bucket"):
>             """Given a sequence of column names and a table name
>             query the catalog and return number o columns which are
>             used for bucketing
>             """
>             cols = self.spark.catalog.listColumns(table)
>             num = len([c for c in cols if c.name in names and c.isBucket])
>             return num
>     
>         with self.table("pyspark_bucket"):
>             # Test write with one bucketing column
> >           df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket")
> ../test_readwriter.py:123: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> ../../connect/readwriter.py:381: in saveAsTable
>     self._spark.client.execute_command(self._write.command(self._spark.client))
> ../../connect/client.py:478: in execute_command
>     self._execute(req)
> ../../connect/client.py:562: in _execute
>     self._handle_error(rpc_error)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7fe0d069b5b0>
> rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
> 	status = StatusCode.UNKNOWN
> 	details = ""
> 	debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
> >
>     def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
>         """
>         Error handling helper for dealing with GRPC Errors. On the server side, certain
>         exceptions are enriched with additional RPC Status information. These are
>         unpacked in this function and put into the exception.
>     
>         To avoid overloading the user with GRPC errors, this message explicitly
>         swallows the error context from the call. This GRPC Error is logged however,
>         and can be enabled.
>     
>         Parameters
>         ----------
>         rpc_error : grpc.RpcError
>            RPC Error containing the details of the exception.
>     
>         Returns
>         -------
>         Throws the appropriate internal Python exception.
>         """
>         logger.exception("GRPC Error received")
>         # We have to cast the value here because, a RpcError is a Call as well.
>         # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
>         status = rpc_status.from_call(cast(grpc.Call, rpc_error))
>         if status:
>             for d in status.details:
>                 if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
>                     info = error_details_pb2.ErrorInfo()
>                     d.Unpack(info)
>                     if info.reason == "org.apache.spark.sql.AnalysisException":
>                         raise SparkConnectAnalysisException(
>                             info.reason, info.metadata["message"], info.metadata["plan"]
>                         ) from None
>                     else:
>                         raise SparkConnectException(status.message, info.reason) from None
>     
>             raise SparkConnectException(status.message) from None
>         else:
> >           raise SparkConnectException(str(rpc_error)) from None
> E           pyspark.sql.connect.client.SparkConnectException: <_MultiThreadedRendezvous of RPC that terminated with:
> E           	status = StatusCode.UNKNOWN
> E           	details = ""
> E           	debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
> E           >
> ../../connect/client.py:640: SparkConnectException
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org