You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2023/01/12 05:39:00 UTC

[jira] [Updated] (SPARK-42016) Type inconsistency of struct and map when accessing the nested column

     [ https://issues.apache.org/jira/browse/SPARK-42016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-42016:
---------------------------------
    Epic Link:     (was: SPARK-39375)

> Type inconsistency of struct and map when accessing the nested column
> ---------------------------------------------------------------------
>
>                 Key: SPARK-42016
>                 URL: https://issues.apache.org/jira/browse/SPARK-42016
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Connect
>    Affects Versions: 3.4.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>
> {code}
> org.apache.spark.sql.AnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
> 	at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidColumnOrFieldDataTypeError(QueryCompilationErrors.scala:3179)
> 	at org.apache.spark.sql.catalyst.plans.logical.Project$.reconcileColumnType(basicLogicalOperators.scala:163)
> 	at org.apache.spark.sql.catalyst.plans.logical.Project$.$anonfun$reorderFields$1(basicLogicalOperators.scala:203)
> 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> 	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> 	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
> 	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> 	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> 	at org.apache.spark.sql.catalyst.plans.logical.Project$.reorderFields(basicLogicalOperators.scala:173)
> 	at org.apache.spark.sql.catalyst.plans.logical.Project$.matchSchema(basicLogicalOperators.scala:103)
> 	at org.apache.spark.sql.Dataset.to(Dataset.scala:485)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLocalRelation(SparkConnectPlanner.scala:635)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:83)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformProject(SparkConnectPlanner.scala:678)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:70)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLimit(SparkConnectPlanner.scala:758)
> 	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:72)
> 	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:58)
> 	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:49)
> 	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
> 	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
> 	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> 	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
> 	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
> 	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> pyspark/sql/tests/test_column.py:126 (ColumnParityTests.test_field_accessor)
> self = <pyspark.sql.tests.connect.test_parity_column.ColumnParityTests testMethod=test_field_accessor>
>     def test_field_accessor(self):
>         df = self.spark.createDataFrame([Row(l=[1], r=Row(a=1, b="b"), d={"k": "v"})])
> >       self.assertEqual(1, df.select(df.l[0]).first()[0])
> ../test_column.py:129: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> ../../connect/dataframe.py:340: in first
>     return self.head()
> ../../connect/dataframe.py:407: in head
>     rs = self.head(1)
> ../../connect/dataframe.py:409: in head
>     return self.take(n)
> ../../connect/dataframe.py:414: in take
>     return self.limit(num).collect()
> ../../connect/dataframe.py:1247: in collect
>     table = self._session.client.to_table(query)
> ../../connect/client.py:415: in to_table
>     table, _ = self._execute_and_fetch(req)
> ../../connect/client.py:593: in _execute_and_fetch
>     self._handle_error(rpc_error)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7f97b8eff580>
> rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
> 	status = StatusCode.INTERNAL
> 	details = "[INVALID_COLUMN_OR_FI...ATA_TYPE] Column or field `d` is of type \"STRUCT<k: STRING>\" while it\'s required to be \"MAP<STRING, STRING>\"."}"
> >
>     def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
>         """
>         Error handling helper for dealing with GRPC Errors. On the server side, certain
>         exceptions are enriched with additional RPC Status information. These are
>         unpacked in this function and put into the exception.
>     
>         To avoid overloading the user with GRPC errors, this message explicitly
>         swallows the error context from the call. This GRPC Error is logged however,
>         and can be enabled.
>     
>         Parameters
>         ----------
>         rpc_error : grpc.RpcError
>            RPC Error containing the details of the exception.
>     
>         Returns
>         -------
>         Throws the appropriate internal Python exception.
>         """
>         logger.exception("GRPC Error received")
>         # We have to cast the value here because, a RpcError is a Call as well.
>         # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
>         status = rpc_status.from_call(cast(grpc.Call, rpc_error))
>         if status:
>             for d in status.details:
>                 if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
>                     info = error_details_pb2.ErrorInfo()
>                     d.Unpack(info)
>                     if info.reason == "org.apache.spark.sql.AnalysisException":
> >                       raise SparkConnectAnalysisException(
>                             info.reason, info.metadata["message"], info.metadata["plan"]
>                         ) from None
> E                       pyspark.sql.connect.client.SparkConnectAnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
> E                       Plan:
> ../../connect/client.py:632: SparkConnectAnalysisException
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org