You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Driesprong, Fokko" <fo...@driesprong.frl> on 2019/12/20 15:00:19 UTC

Folks,

I've opened a PR a while ago with a PR to merge the possibility to merge a
custom data type, into a native data type. This is something new because of
the introduction of Delta.

To have some background, I'm having a DataSet that has fields of the type
XMLGregorianCalendarType. I don't care about this type and would like to
convert this to a standard data type. Mainly because, if I'm reading the
data again using another job, it needs to have the customer data type being
registered, which is not possible in the SQL API. The magic bit here is
that I'm overriding the jsonValue to lose the information about the custom
data type. In this case, you have to make sure that it is serialized as the
normal timestamp.

Before Delta, when appending to the table, everything would go fine because
it would not check compatibility on write. Now with Delta, things are
different. When writing, it will check if the two structures can be merged:

OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support
was removed in 8.0
Warning: Ignoring non-spark config property:
eventLog.rolloverIntervalSeconds=3600
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed
to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge
incompatible data types TimestampType and
org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
at
com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
at
com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
at
com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
at
com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
at
com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
at
com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
at
com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
at
com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
at
com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
at
com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
at
com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
at
com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
at
com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at
org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
at com.ahold.IngestFild$.main(IngestFild.scala:31)
at com.ahold.IngestFild.main(IngestFild.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Currently, the merge function does not support UDT's. Therefore I've
extended the rules. Last few weeks it was quiet at the PR. Is this
something that we can merge into Spark? I would like to get your opinion on
this.

 Cheers, Fokko

Re:

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Anyone any opinion on this? A link to the PR:
https://github.com/apache/spark/pull/26644

Cheers, Fokko


Op vr 20 dec. 2019 om 16:00 schreef Driesprong, Fokko <fokko@driesprong.frl
>:

> Folks,
>
> I've opened a PR a while ago with a PR to merge the possibility to merge
> a custom data type, into a native data type. This is something new because
> of the introduction of Delta.
>
> To have some background, I'm having a DataSet that has fields of the type
> XMLGregorianCalendarType. I don't care about this type and would like to
> convert this to a standard data type. Mainly because, if I'm reading the
> data again using another job, it needs to have the customer data type being
> registered, which is not possible in the SQL API. The magic bit here is
> that I'm overriding the jsonValue to lose the information about the custom
> data type. In this case, you have to make sure that it is serialized as the
> normal timestamp.
>
> Before Delta, when appending to the table, everything would go fine
> because it would not check compatibility on write. Now with Delta, things
> are different. When writing, it will check if the two structures can be
> merged:
>
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m;
> support was removed in 8.0
> Warning: Ignoring non-spark config property:
> eventLog.rolloverIntervalSeconds=3600
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed
> to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge
> incompatible data types TimestampType and
> org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
> at
> com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
> at
> com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
> at
> com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
> at
> com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
> at
> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
> at
> com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
> at
> com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
> at
> com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
> at
> com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
> at
> com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
> at
> com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
> at
> com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
> at
> com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
> at
> org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
> at
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
> at
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
> at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
> at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
> at com.ahold.IngestFild$.main(IngestFild.scala:31)
> at com.ahold.IngestFild.main(IngestFild.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Currently, the merge function does not support UDT's. Therefore I've
> extended the rules. Last few weeks it was quiet at the PR. Is this
> something that we can merge into Spark? I would like to get your opinion on
> this.
>
>  Cheers, Fokko
>