You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/08 08:03:16 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

zhengruifeng opened a new pull request, #38979:
URL: https://github.com/apache/spark/pull/38979

   ### What changes were proposed in this pull request?
   1, support schema;
   2, support more types: ndarray, list
   
   
   ### Why are the changes needed?
   for API coverage
   
   ### Does this PR introduce _any_ user-facing change?
   yes
   
   
   ### How was this patch tested?
   added types


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types
URL: https://github.com/apache/spark/pull/38979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043433892


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]

Review Comment:
   I'm not sure if this can be used here, as `_parse_datatype_string` internally calls into the JVM. I think we have to add a field to the `LocalRelation` message to pass a schema string instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044332219


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   Will it work, if we use `val proj = UnsafeProjection.create(attributes, inferredSchema .toAttributes)` instead? We will likely also have to add some validation, similar to validation that pyspark currently performs using `_make_type_verifier`.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   Will it work, if we use `val proj = UnsafeProjection.create(attributes, inferredSchema .toAttributes)` instead? We will likely also have to add some validation that is similar to validation that pyspark currently performs using `_make_type_verifier`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043433892


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]

Review Comment:
   I'm not sure if this can be used here, as `_parse_datatype_string` internally calls into the JVM. I think we have to add a field to the `LocalRelation` message to store the schema string instead, so that the driver can parse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044328510


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   we need to use `inferredSchema` in `proj`, otherwise it may fail due to datatype mismatch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043125777


##########
python/pyspark/sql/connect/_typing.py:
##########
@@ -26,8 +26,12 @@
 import datetime
 import decimal
 
+from numpy import ndarray as NDArray
+
 from pyspark.sql.connect.column import Column
 
+ArrayLike = NDArray

Review Comment:
   just copy it from pyspark



##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +304,76 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]
+            assert isinstance(struct, StructType)
+            column_names = struct.names
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with StructField names
+            column_names = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if len(column_names) == 0:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    column_names = ["value"]
+                else:
+                    column_names = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if len(column_names) == 0:
+                column_names = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Adjust the column names
+        if len(column_names) > 0:
+            pdf.columns = column_names
+
+        # Casting according to the input schema
+        if struct is not None:
+            for field in struct.fields:
+                name = field.name
+                dt = field.dataType
+
+                if isinstance(dt, ByteType):

Review Comment:
   let me have a try



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1344284474

   merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044248636


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +380,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        session.sessionState.sqlParser
+          .parseTableSchema(rel.getDatatypeStr)

Review Comment:
   Yeah, let's handle DDL formatted string together if it's not tricky



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044253256


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +380,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        session.sessionState.sqlParser
+          .parseTableSchema(rel.getDatatypeStr)

Review Comment:
   thanks, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1343866751

   @HyukjinKwon @cloud-fan @amaliujia @grundprinzip @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044259093


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -305,6 +306,17 @@ message LocalRelation {
   // Local collection data serialized into Arrow IPC streaming format which contains
   // the schema of the data.
   bytes data = 1;
+
+  // (Optional) The user provided schema.
+  //
+  // The Sever side will update the column names and data types according to this schema.
+  oneof schema {
+
+    DataType datatype = 2;
+
+    // Server will use Catalyst parser to parse this string to DataType.
+    string datatype_str = 3;

Review Comment:
   I'm think adding support for `_parse_datatype_string` in AnalyzePlan, then we don't need to add `datatype` and `datatype_str` in `LocalRelation` at all.
   
   Then the implementation will be like this (after we implement [`DataFrame.to`](https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1913-L1968)):
   
   ```
   schema = _parse_datatype_string(schema_str)
   return DataFrame.withPlan(LocalRelation(table), self).toDF(*schema.fieldNames).to(schema)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044271991


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -371,6 +371,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def parseDatatypeString(sqlText: String): DataType = {
+    var dataType: DataType = null
+    try {
+      dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
+    } catch {
+      case e1: ParseException =>
+        try {
+          dataType = session.sessionState.sqlParser.parseDataType(sqlText)
+        } catch {
+          case e2: ParseException =>
+            dataType = session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
+        }
+    }
+    dataType

Review Comment:
   Nit: This can be simplified, and should throw the original exception.
   ```scala
       val parser = session.sessionState.sqlParser
       try {
         parser.parseTableSchema(sqlText)
       } catch {
         case e: ParseException =>
           try {
             parser.parseDataType(sqlText)
           } catch {
             case _: ParseException =>
               try {
                 parser.parseDataType(s"struct<${sqlText.strip}>")
               } catch {
                 case _: ParseException =>
                   throw e
               }
               
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044352394


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   still fails:
   
   ```
   grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
           status = StatusCode.UNKNOWN
           details = "Couldn't find 0#1215L in [0#1219L,1#1220L,2#1221L,3#1222L]"
           debug_error_string = "{"created":"@1670584625.321243000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Couldn't find 0#1215L in [0#1219L,1#1220L,2#1221L,3#1222L]","grpc_status":2}"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1342250724

   cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044332219


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   Will it work, if we use `val proj = UnsafeProjection.create(attributes, inferredSchema.toAttributes)` instead? We will likely also have to add some validation that is similar to validation that pyspark currently performs using `_make_type_verifier`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044220839


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,71 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        _schema: Optional[StructType] = None
+        _schema_str: Optional[str] = None
+        _cols: Optional[List[str]] = None
+
+        if isinstance(schema, StructType):
+            _schema = schema
+
+        elif isinstance(schema, str):
+            _schema_str = schema
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with StructField names
+            _cols = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if _cols is None:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    _cols = ["value"]
+                else:
+                    _cols = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if _cols is None:
+                _cols = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Validate number of columns
+        num_cols = pdf.shape[1]
+        if _schema is not None and len(_schema.fields) != num_cols:
+            raise ValueError(
+                f"Length mismatch: Expected axis has {num_cols} elements, "
+                f"new values have {len(_schema.fields)} elements"
+            )
+        elif _cols is not None and len(_cols) != num_cols:
+            raise ValueError(
+                f"Length mismatch: Expected axis has {num_cols} elements, "
+                f"new values have {len(_cols)} elements"
+            )
+
+        table = pa.Table.from_pandas(pdf)
+
+        if _schema is not None:
+            return DataFrame.withPlan(LocalRelation(table, schema=_schema), self)
+        elif _schema_str is not None:
+            return DataFrame.withPlan(LocalRelation(table, schema=_schema_str), self)

Review Comment:
   If we can have a RPC  for `parseTableSchema` in `AnalyzePlan` and implement `DataFrame.to`, then we do not need to add `schema` in `LocalRelation`'s proto, and simplify here with  `DataFrame.withPlan(LocalRelation(table), self).toDF(...).to(...)`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044221212


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +380,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        session.sessionState.sqlParser
+          .parseTableSchema(rel.getDatatypeStr)

Review Comment:
   We need to call `parseDataType` here if `parseTableSchema` fails according to the implementation of `_parse_datatype_string`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043118539


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +304,76 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]
+            assert isinstance(struct, StructType)
+            column_names = struct.names
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with StructField names
+            column_names = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if len(column_names) == 0:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    column_names = ["value"]
+                else:
+                    column_names = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if len(column_names) == 0:
+                column_names = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Adjust the column names
+        if len(column_names) > 0:
+            pdf.columns = column_names
+
+        # Casting according to the input schema
+        if struct is not None:
+            for field in struct.fields:
+                name = field.name
+                dt = field.dataType
+
+                if isinstance(dt, ByteType):

Review Comment:
   I think `_to_corrected_pandas_type`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1343806000

   difference in casting:
   this PR leverages `Dataset.to(schema)` to cast datatypes, which is very different from the pyspark's approach which relies on [the `_acceptable_types` list](https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L1755-L1775)
   
   `createDataFrame([[1, 2, 3, 4]], schema="col1 int, col2 int, col3 int, col4 double")` runs successfully in Connect, while it fails in PySpark:
   
   
   ```
   Traceback (most recent call last):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_basic.py", line 299, in test_with_local_list
       self.spark.createDataFrame([[1, 2, 3, 4]], schema="col1 int, col2 int, col3 int, col4 double")
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1164, in createDataFrame
       return self._create_dataframe(
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1206, in _create_dataframe
       rdd, struct = self._createFromLocal(map(prepare, data), schema)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 850, in _createFromLocal
       data = list(data)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1180, in prepare
       verify_func(obj)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 2003, in verify
       verify_value(obj)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1981, in verify_struct
       verifier(v)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 2003, in verify
       verify_value(obj)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1997, in verify_default
       verify_acceptable_types(obj)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1873, in verify_acceptable_types
       raise TypeError(
   TypeError: field col4: DoubleType() can not accept object 4 in type <class 'int'>
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044122515


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -304,6 +305,24 @@ message LocalRelation {
   // Local collection data serialized into Arrow IPC streaming format which contains
   // the schema of the data.
   bytes data = 1;
+
+  // (Optional) The user provided schema.
+  //
+  // The Sever side will update the column names and data types according to this schema.
+  oneof schema {
+
+    DataType datatype = 2;
+
+    // Server will use Catalyst parser to parse this string to DataType.
+    string datatype_str = 3;
+
+    // Column names
+    StringList cols = 4;

Review Comment:
   I am a bit confused on this one: what would be the types for such column names? Are the types inferred but the names are provided? Can the behavior be documented here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044197926


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -304,6 +305,24 @@ message LocalRelation {
   // Local collection data serialized into Arrow IPC streaming format which contains
   // the schema of the data.
   bytes data = 1;
+
+  // (Optional) The user provided schema.
+  //
+  // The Sever side will update the column names and data types according to this schema.
+  oneof schema {
+
+    DataType datatype = 2;
+
+    // Server will use Catalyst parser to parse this string to DataType.
+    string datatype_str = 3;
+
+    // Column names
+    StringList cols = 4;

Review Comment:
   they are used for renaming, I'm think removing it and directly use `toDF` in Client



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1344282905

   all tests passed, let me merge it now. Thanks for the reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38979:
URL: https://github.com/apache/spark/pull/38979#issuecomment-1342247448

   PySpark `createDataFrame` infer and validate the data types, create RDD from list, and directly assign the sql schema in JVM. And there are many related configurations including
   ```
   self._jconf.inferDictAsStruct()
   self._jconf.sessionLocalTimeZone()
   self._jconf.arrowPySparkEnabled()
   self._jconf.arrowPySparkFallbackEnabled()
   self._jconf.arrowMaxRecordsPerBatch()
   self._jconf.arrowSafeTypeConversion()
   self._jconf.legacyInferArrayTypeFromFirstElement()
   is_timestamp_ntz_preferred()
   ...
   ```
   
   In Connect, datasets are always convert to a Pandas DataFrame (internally a PyArrow Table). So I simply use `pd.DataFrame(list(data))` for infer its datatypes.
   
   The two approaches are so different that I am afraid it is hard to 100% match PySpark's `createDataFrame`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043185541


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]
+            assert isinstance(struct, StructType)
+            column_names = struct.names
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with StructField names
+            column_names = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if len(column_names) == 0:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    column_names = ["value"]
+                else:
+                    column_names = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if len(column_names) == 0:
+                column_names = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Adjust the column names
+        if len(column_names) > 0:
+            pdf.columns = column_names
+
+        # Casting according to the input schema
+        if struct is not None:
+            for field in struct.fields:
+                name = field.name
+                dt = field.dataType
+                if isinstance(dt, StringType):
+                    pdf[name] = pdf[name].apply(str)
+                else:
+                    pt = PandasConversionMixin._to_corrected_pandas_type(dt)

Review Comment:
   I attempted to make `_to_corrected_pandas_type` support `StringType` by returning `np.str_`
   
   then the `createDataFrame` related tests pass as expected, but some other pyspark tests become weird. So check `isinstance(dt, StringType)` here.
   
    In the future, I think we should directly create PyArrow Table from ndarray and list, to skip the intermediate conversions to/from pandas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044063886


##########
python/pyspark/sql/connect/plan.py:
##########
@@ -167,21 +169,38 @@ def _repr_html_(self) -> str:
 
 
 class LocalRelation(LogicalPlan):
-    """Creates a LocalRelation plan object based on a Pandas DataFrame."""
+    """Creates a LocalRelation plan object based on a PyArrow Table."""
 
-    def __init__(self, pdf: "pandas.DataFrame") -> None:
+    def __init__(
+        self,
+        table: "pa.Table",
+        schema: Optional[Union[DataType, str, Sequence[str]]] = None,
+    ) -> None:
         super().__init__(None)
-        self._pdf = pdf
+        assert table is not None and isinstance(table, pa.Table)
+        self._table = table
+
+        if schema is not None:
+            assert isinstance(schema, (DataType, str, list))
+            if isinstance(schema, list):
+                assert all(isinstance(c, str) for c in schema)
+        self._schema = schema
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         sink = pa.BufferOutputStream()
-        table = pa.Table.from_pandas(self._pdf)
-        with pa.ipc.new_stream(sink, table.schema) as writer:
-            for b in table.to_batches():
+        with pa.ipc.new_stream(sink, self._table.schema) as writer:
+            for b in self._table.to_batches():
                 writer.write_batch(b)
 
         plan = proto.Relation()
         plan.local_relation.data = sink.getvalue().to_pybytes()
+        if self._schema is not None:
+            if isinstance(self._schema, DataType):
+                plan.local_relation.datatype.CopyFrom(pyspark_types_to_proto_types(self._schema))

Review Comment:
   `pyspark_types_to_proto_types` does not support `StructType` now.
   I'm going to fix it in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044247543


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -305,6 +306,17 @@ message LocalRelation {
   // Local collection data serialized into Arrow IPC streaming format which contains
   // the schema of the data.
   bytes data = 1;
+
+  // (Optional) The user provided schema.
+  //
+  // The Sever side will update the column names and data types according to this schema.
+  oneof schema {
+
+    DataType datatype = 2;
+
+    // Server will use Catalyst parser to parse this string to DataType.
+    string datatype_str = 3;

Review Comment:
   Or we can always pass string implementation for now (by turning DataType to a JSON representation), `DataType.json()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044358914


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   seems due to unresolved



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -371,6 +371,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def parseDatatypeString(sqlText: String): DataType = {
+    var dataType: DataType = null
+    try {
+      dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
+    } catch {
+      case e1: ParseException =>
+        try {
+          dataType = session.sessionState.sqlParser.parseDataType(sqlText)
+        } catch {
+          case e2: ParseException =>
+            dataType = session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
+        }
+    }
+    dataType

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044247543


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -305,6 +306,17 @@ message LocalRelation {
   // Local collection data serialized into Arrow IPC streaming format which contains
   // the schema of the data.
   bytes data = 1;
+
+  // (Optional) The user provided schema.
+  //
+  // The Sever side will update the column names and data types according to this schema.
+  oneof schema {
+
+    DataType datatype = 2;
+
+    // Server will use Catalyst parser to parse this string to DataType.
+    string datatype_str = 3;

Review Comment:
   Or we can always pass string implementation for now (by turning DataType to a JSON representation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044271450


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   This throws away the names of nested fields. Perhaps we can do something like the following instead:
   ```scala
       val (rows, inferredSchema) = ArrowConverters.fromBatchWithSchemaIterator(
         Iterator(rel.getData.toByteArray),
         TaskContext.get())
       if (inferredSchema == null) {
         throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.")
       }
   
       val schemaType = if (rel.hasDataType) {
         DataTypeProtoConverter.toCatalystType(rel.getDataType)
       } else if (rel.hasDataTypeString) {
         parseDatatypeString(rel.getDataTypeString)
       } else {
         inferredSchema
       }
   
       val schemaStruct = schemaType match {
         case s: StructType => s
         case d => StructType(Seq(StructField("value", d)))
       }
   
       val attributes = schemaStruct.toAttributes
       val proj = UnsafeProjection.create(attributes, attributes)
       new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]

Review Comment:
   This is not guaranteed to be a `StructType`. `createDataFrame` also allows `AtomicType` to be used, and in that case `"value"` will be used as the column name.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -371,6 +371,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def parseDatatypeString(sqlText: String): DataType = {
+    var dataType: DataType = null
+    try {
+      dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
+    } catch {
+      case e1: ParseException =>
+        try {
+          dataType = session.sessionState.sqlParser.parseDataType(sqlText)
+        } catch {
+          case e2: ParseException =>
+            dataType = session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
+        }
+    }
+    dataType

Review Comment:
   Nit: This can be simplified.
   ```scala
       try {
         session.sessionState.sqlParser.parseTableSchema(sqlText)
       } catch {
         case _: ParseException =>
           try {
             session.sessionState.sqlParser.parseDataType(sqlText)
           } catch {
             case _: ParseException =>
               session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
           }
       }
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]

Review Comment:
   Same issue here, this can be any `DataType`, not just a `StructType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044406797


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   Hmmm, this is going to more difficult than I anticipated. We will have to check if the inferred type and the provided type are compatible, and then we'll have to use something similar to `EvaluatePython.makeFromJava` to perform the conversion. Let's do this in a follow-up :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043122774


##########
python/pyspark/sql/connect/_typing.py:
##########
@@ -26,8 +26,12 @@
 import datetime
 import decimal
 
+from numpy import ndarray as NDArray
+
 from pyspark.sql.connect.column import Column
 
+ArrayLike = NDArray

Review Comment:
   hm, should we do this alias?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043149562


##########
python/pyspark/sql/connect/_typing.py:
##########
@@ -26,8 +26,12 @@
 import datetime
 import decimal
 
+from numpy import ndarray as NDArray
+
 from pyspark.sql.connect.column import Column
 
+ArrayLike = NDArray

Review Comment:
   let me remove it, since it's now used only once



##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +304,76 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]
+            assert isinstance(struct, StructType)
+            column_names = struct.names
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with StructField names
+            column_names = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if len(column_names) == 0:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    column_names = ["value"]
+                else:
+                    column_names = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if len(column_names) == 0:
+                column_names = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Adjust the column names
+        if len(column_names) > 0:
+            pdf.columns = column_names
+
+        # Casting according to the input schema
+        if struct is not None:
+            for field in struct.fields:
+                name = field.name
+                dt = field.dataType
+
+                if isinstance(dt, ByteType):

Review Comment:
   great! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tomvanbussel commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043433892


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]

Review Comment:
   I'm not sure if this can be used here, as `_parse_datatype_string` internally calls into the JVM. I think we have to add an option to the `LocalRelation` to pass a schema string instead.



##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names

Review Comment:
   This will ignore the names of nested fields, and it will ignore the types. To me it seems that we should leave the Pandas DataFrame untouched here, and instead pass the schema struct in the `LocationRelation` message to the driver.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38979: [SPARK-41446][CONNECT][PYTHON] Make `createDataFrame` support schema and more input dataset types

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1043949446


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,67 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        struct: Optional[StructType] = None
+        column_names: List[str] = []
+
+        if isinstance(schema, StructType):
+            struct = schema
+            column_names = struct.names
+
+        elif isinstance(schema, str):
+            struct = _parse_datatype_string(schema)  # type: ignore[assignment]

Review Comment:
   you are right, we should not call `_parse_datatype_string`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org