You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2023/02/14 00:46:00 UTC
[jira] [Assigned] (SPARK-41999) NPE for bucketed write (ReadwriterTests.test_bucketed_write)
[ https://issues.apache.org/jira/browse/SPARK-41999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-41999:
------------------------------------
Assignee: (was: Apache Spark)
> NPE for bucketed write (ReadwriterTests.test_bucketed_write)
> ------------------------------------------------------------
>
> Key: SPARK-41999
> URL: https://issues.apache.org/jira/browse/SPARK-41999
> Project: Spark
> Issue Type: Sub-task
> Components: Connect
> Affects Versions: 3.4.0
> Reporter: Hyukjin Kwon
> Priority: Major
>
> {code}
> java.util.NoSuchElementException
> at java.util.AbstractList$Itr.next(AbstractList.java:364)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
> at scala.collection.IterableLike.head(IterableLike.scala:109)
> at scala.collection.IterableLike.head$(IterableLike.scala:108)
> at scala.collection.AbstractIterable.head(Iterable.scala:56)
> at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
> at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
> at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
> at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
> at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
> at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
> at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
> at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
> at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing runnable org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
> java.lang.NullPointerException
> at org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
> at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
> at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
> at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
> at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
> at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
> at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
> at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> pyspark/sql/tests/test_readwriter.py:102 (ReadwriterParityTests.test_bucketed_write)
> self = <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests testMethod=test_bucketed_write>
> def test_bucketed_write(self):
> data = [
> (1, "foo", 3.0),
> (2, "foo", 5.0),
> (3, "bar", -1.0),
> (4, "bar", 6.0),
> ]
> df = self.spark.createDataFrame(data, ["x", "y", "z"])
>
> def count_bucketed_cols(names, table="pyspark_bucket"):
> """Given a sequence of column names and a table name
> query the catalog and return number o columns which are
> used for bucketing
> """
> cols = self.spark.catalog.listColumns(table)
> num = len([c for c in cols if c.name in names and c.isBucket])
> return num
>
> with self.table("pyspark_bucket"):
> # Test write with one bucketing column
> > df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket")
> ../test_readwriter.py:123:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ../../connect/readwriter.py:381: in saveAsTable
> self._spark.client.execute_command(self._write.command(self._spark.client))
> ../../connect/client.py:478: in execute_command
> self._execute(req)
> ../../connect/client.py:562: in _execute
> self._handle_error(rpc_error)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7fe0d069b5b0>
> rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
> status = StatusCode.UNKNOWN
> details = ""
> debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
> >
> def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
> """
> Error handling helper for dealing with GRPC Errors. On the server side, certain
> exceptions are enriched with additional RPC Status information. These are
> unpacked in this function and put into the exception.
>
> To avoid overloading the user with GRPC errors, this message explicitly
> swallows the error context from the call. This GRPC Error is logged however,
> and can be enabled.
>
> Parameters
> ----------
> rpc_error : grpc.RpcError
> RPC Error containing the details of the exception.
>
> Returns
> -------
> Throws the appropriate internal Python exception.
> """
> logger.exception("GRPC Error received")
> # We have to cast the value here because, a RpcError is a Call as well.
> # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
> status = rpc_status.from_call(cast(grpc.Call, rpc_error))
> if status:
> for d in status.details:
> if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
> info = error_details_pb2.ErrorInfo()
> d.Unpack(info)
> if info.reason == "org.apache.spark.sql.AnalysisException":
> raise SparkConnectAnalysisException(
> info.reason, info.metadata["message"], info.metadata["plan"]
> ) from None
> else:
> raise SparkConnectException(status.message, info.reason) from None
>
> raise SparkConnectException(status.message) from None
> else:
> > raise SparkConnectException(str(rpc_error)) from None
> E pyspark.sql.connect.client.SparkConnectException: <_MultiThreadedRendezvous of RPC that terminated with:
> E status = StatusCode.UNKNOWN
> E details = ""
> E debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
> E >
> ../../connect/client.py:640: SparkConnectException
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org