You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Steve Suh (Jira)" <ji...@apache.org> on 2020/09/04 21:13:00 UTC

[jira] [Comment Edited] (ARROW-6972) [C#] Should support StructField arrays

    [ https://issues.apache.org/jira/browse/ARROW-6972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190918#comment-17190918 ] 

Steve Suh edited comment on ARROW-6972 at 9/4/20, 9:12 PM:
-----------------------------------------------------------

We are currently trying to support [Spark 3.0|https://github.com/apache/spark] in [.NET for Apache Spark|https://github.com/dotnet/spark] .  We are tracking it with [issue 654|https://github.com/dotnet/spark/issues/654].  To support this, we need to be able to create a StructArray with StructType/Fields.  However, when attempting to create these, we run into the following exception:

ProcessStream() failed with exception: *System.NotImplementedException: The method or operation is not implemented.*
   at Apache.Arrow.Ipc.ArrowTypeFlatbufferBuilder.TypeVisitor.Visit(IArrowType type)
   at Apache.Arrow.Types.ArrowType.Accept[T](T type, IArrowTypeVisitor visitor)
   at Apache.Arrow.Types.StructType.Accept(IArrowTypeVisitor visitor)
   at Apache.Arrow.Ipc.ArrowStreamWriter.SerializeSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteRecordBatchInternalAsync(RecordBatch recordBatch, CancellationToken cancellationToken)
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteArrowGroupedMapCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 770
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 740
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 75
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 57
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
[2020-09-04T18:57:09.0737772Z] [UDON] [Error] [TaskRunner] [0] Exiting with exception: System.NotImplementedException: The method or operation is not implemented.
   at Apache.Arrow.Ipc.ArrowTypeFlatbufferBuilder.TypeVisitor.Visit(IArrowType type)
   at Apache.Arrow.Types.ArrowType.Accept[T](T type, IArrowTypeVisitor visitor)
   at Apache.Arrow.Types.StructType.Accept(IArrowTypeVisitor visitor)
   at Apache.Arrow.Ipc.ArrowStreamWriter.SerializeSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteRecordBatchInternalAsync(RecordBatch recordBatch, CancellationToken cancellationToken)
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteArrowGroupedMapCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 770
   at Microsoft.20/09/04 11:57:09 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 740
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 75
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 57
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 65


was (Author: suhsteve):
We are currently trying to support [Spark 3.0|https://github.com/apache/spark] in [.NET for Apache Spark|https://github.com/dotnet/spark] .  We are tracking the work with [issue 654|https://github.com/dotnet/spark/issues/654]  To support this, we need to be able to create a StructArray with StructType/Fields.  However, when attempting to create these, we run into the following exception:

ProcessStream() failed with exception: *System.NotImplementedException: The method or operation is not implemented.*
   at Apache.Arrow.Ipc.ArrowTypeFlatbufferBuilder.TypeVisitor.Visit(IArrowType type)
   at Apache.Arrow.Types.ArrowType.Accept[T](T type, IArrowTypeVisitor visitor)
   at Apache.Arrow.Types.StructType.Accept(IArrowTypeVisitor visitor)
   at Apache.Arrow.Ipc.ArrowStreamWriter.SerializeSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteRecordBatchInternalAsync(RecordBatch recordBatch, CancellationToken cancellationToken)
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteArrowGroupedMapCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 770
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 740
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 75
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 57
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
[2020-09-04T18:57:09.0737772Z] [UDON] [Error] [TaskRunner] [0] Exiting with exception: System.NotImplementedException: The method or operation is not implemented.
   at Apache.Arrow.Ipc.ArrowTypeFlatbufferBuilder.TypeVisitor.Visit(IArrowType type)
   at Apache.Arrow.Types.ArrowType.Accept[T](T type, IArrowTypeVisitor visitor)
   at Apache.Arrow.Types.StructType.Accept(IArrowTypeVisitor visitor)
   at Apache.Arrow.Ipc.ArrowStreamWriter.SerializeSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
   at Apache.Arrow.Ipc.ArrowStreamWriter.WriteRecordBatchInternalAsync(RecordBatch recordBatch, CancellationToken cancellationToken)
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteArrowGroupedMapCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 770
   at Microsoft.20/09/04 11:57:09 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Spark.Worker.Command.ArrowOrDataFrameGroupedMapCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 740
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 75
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 57
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 153
   at Microsoft.Spark.Worker.TaskRunner.Run() in D:\Source\Repos\github\suhsteve\spark\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 65

> [C#] Should support StructField arrays
> --------------------------------------
>
>                 Key: ARROW-6972
>                 URL: https://issues.apache.org/jira/browse/ARROW-6972
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: C#
>            Reporter: Cameron Murray
>            Priority: Major
>
> The C# implementation of Arrow does not support struct arrays and, complex types more generally.
>  I notice ARROW-6870 addresses Dictionary arrays however this is not as flexible as structs (for example, cannot mix data types)
> The source does have a stub for StructArray however there is no Builder nor example on how to use it so I can assume it is not supported.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)