You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2023/01/12 05:38:00 UTC

[jira] [Created] (SPARK-42016) Type inconsistency of struct and map when accessing the nested column

Hyukjin Kwon created SPARK-42016:
------------------------------------

             Summary: Type inconsistency of struct and map when accessing the nested column
                 Key: SPARK-42016
                 URL: https://issues.apache.org/jira/browse/SPARK-42016
             Project: Spark
          Issue Type: Sub-task
          Components: Connect
    Affects Versions: 3.4.0
            Reporter: Hyukjin Kwon


{code}
org.apache.spark.sql.AnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
	at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidColumnOrFieldDataTypeError(QueryCompilationErrors.scala:3179)
	at org.apache.spark.sql.catalyst.plans.logical.Project$.reconcileColumnType(basicLogicalOperators.scala:163)
	at org.apache.spark.sql.catalyst.plans.logical.Project$.$anonfun$reorderFields$1(basicLogicalOperators.scala:203)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.catalyst.plans.logical.Project$.reorderFields(basicLogicalOperators.scala:173)
	at org.apache.spark.sql.catalyst.plans.logical.Project$.matchSchema(basicLogicalOperators.scala:103)
	at org.apache.spark.sql.Dataset.to(Dataset.scala:485)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLocalRelation(SparkConnectPlanner.scala:635)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:83)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformProject(SparkConnectPlanner.scala:678)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:70)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLimit(SparkConnectPlanner.scala:758)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:72)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:58)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:49)
	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

pyspark/sql/tests/test_column.py:126 (ColumnParityTests.test_field_accessor)
self = <pyspark.sql.tests.connect.test_parity_column.ColumnParityTests testMethod=test_field_accessor>

    def test_field_accessor(self):
        df = self.spark.createDataFrame([Row(l=[1], r=Row(a=1, b="b"), d={"k": "v"})])
>       self.assertEqual(1, df.select(df.l[0]).first()[0])

../test_column.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../connect/dataframe.py:340: in first
    return self.head()
../../connect/dataframe.py:407: in head
    rs = self.head(1)
../../connect/dataframe.py:409: in head
    return self.take(n)
../../connect/dataframe.py:414: in take
    return self.limit(num).collect()
../../connect/dataframe.py:1247: in collect
    table = self._session.client.to_table(query)
../../connect/client.py:415: in to_table
    table, _ = self._execute_and_fetch(req)
../../connect/client.py:593: in _execute_and_fetch
    self._handle_error(rpc_error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7f97b8eff580>
rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[INVALID_COLUMN_OR_FI...ATA_TYPE] Column or field `d` is of type \"STRUCT<k: STRING>\" while it\'s required to be \"MAP<STRING, STRING>\"."}"
>

    def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
        """
        Error handling helper for dealing with GRPC Errors. On the server side, certain
        exceptions are enriched with additional RPC Status information. These are
        unpacked in this function and put into the exception.
    
        To avoid overloading the user with GRPC errors, this message explicitly
        swallows the error context from the call. This GRPC Error is logged however,
        and can be enabled.
    
        Parameters
        ----------
        rpc_error : grpc.RpcError
           RPC Error containing the details of the exception.
    
        Returns
        -------
        Throws the appropriate internal Python exception.
        """
        logger.exception("GRPC Error received")
        # We have to cast the value here because, a RpcError is a Call as well.
        # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
        status = rpc_status.from_call(cast(grpc.Call, rpc_error))
        if status:
            for d in status.details:
                if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                    info = error_details_pb2.ErrorInfo()
                    d.Unpack(info)
                    if info.reason == "org.apache.spark.sql.AnalysisException":
>                       raise SparkConnectAnalysisException(
                            info.reason, info.metadata["message"], info.metadata["plan"]
                        ) from None
E                       pyspark.sql.connect.client.SparkConnectAnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
E                       Plan:

../../connect/client.py:632: SparkConnectAnalysisException

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org