You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:20:11 UTC

[jira] [Updated] (SPARK-16517) can't add columns on the table create by spark'writer

     [ https://issues.apache.org/jira/browse/SPARK-16517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-16517:
---------------------------------
    Labels: bulk-closed  (was: )

> can't add columns on the table  create by spark'writer
> ------------------------------------------------------
>
>                 Key: SPARK-16517
>                 URL: https://issues.apache.org/jira/browse/SPARK-16517
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2
>            Reporter: lichenglin
>            Priority: Major
>              Labels: bulk-closed
>
> {code}
> setName("abc");
> HiveContext hive = getHiveContext();
> DataFrame d = hive.createDataFrame(
> 				getJavaSparkContext().parallelize(
> 						Arrays.asList(RowFactory.create("abc", "abc", 5.0), RowFactory.create("abcd", "abcd", 5.0))),
> 				DataTypes.createStructType(
> 						Arrays.asList(DataTypes.createStructField("card_id", DataTypes.StringType, true),
> 								DataTypes.createStructField("tag_name", DataTypes.StringType, true),
> 								DataTypes.createStructField("v", DataTypes.DoubleType, true))));
> d.write().partitionBy("v").mode(SaveMode.Overwrite).saveAsTable("abc");
> hive.sql("alter table abc add columns(v2 double)");
> hive.refreshTable("abc");
> hive.sql("describe abc").show();
> DataFrame d2 = hive.createDataFrame(
> 				getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("abc", "abc", 3.0, 4.0),
> 						RowFactory.create("abcd", "abcd", 3.0, 1.0))),
> 				new StructType(new StructField[] { DataTypes.createStructField("card_id", DataTypes.StringType, true),
> 						DataTypes.createStructField("tag_name", DataTypes.StringType, true),
> 						DataTypes.createStructField("v", DataTypes.DoubleType, true),
> 						DataTypes.createStructField("v2", DataTypes.DoubleType, true) }));
> d2.write().partitionBy("v").mode(SaveMode.Append).saveAsTable("abc");
> hive.table("abc").show();
> {code}
> spark.sql.parquet.mergeSchema has been set to  "true".
> The code's exception is here 
> {code}
> +--------+---------+-------+
> |col_name|data_type|comment|
> +--------+---------+-------+
> | card_id|   string|       |
> |tag_name|   string|       |
> |       v|   double|       |
> +--------+---------+-------+
> 2016-07-13 13:40:43,637 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : db=default tbl=abc
> 2016-07-13 13:40:43,637 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl	ip=unknown-ip-addr	cmd=get_table : db=default tbl=abc	
> 2016-07-13 13:40:43,693 INFO [org.apache.spark.storage.BlockManagerInfo:58] - Removed broadcast_2_piece0 on localhost:50647 in memory (size: 1176.0 B, free: 1125.7 MB)
> 2016-07-13 13:40:43,700 INFO [org.apache.spark.ContextCleaner:58] - Cleaned accumulator 2
> 2016-07-13 13:40:43,702 INFO [org.apache.spark.storage.BlockManagerInfo:58] - Removed broadcast_1_piece0 on localhost:50647 in memory (size: 19.4 KB, free: 1125.7 MB)
> 2016-07-13 13:40:43,702 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : db=default tbl=abc
> 2016-07-13 13:40:43,703 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl	ip=unknown-ip-addr	cmd=get_table : db=default tbl=abc	
> Exception in thread "main" java.lang.RuntimeException: Relation[card_id#26,tag_name#27,v#28] ParquetRelation
>  requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.
> 	at scala.sys.package$.error(package.scala:27)
> 	at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:68)
> 	at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:58)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
> 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
> 	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
> 	at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:58)
> 	at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:57)
> 	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
> 	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
> 	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
> 	at scala.collection.immutable.List.foldLeft(List.scala:84)
> 	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
> 	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
> 	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
> 	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
> 	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> 	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
> 	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
> 	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
> 	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> 	at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
> 	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
> 	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
> 	at CubeDemoTest.main(CubeDemoTest.java:52)
> {code}
> the metadata is store in mysql,here is the columns_v2's data on table abc
> {code}
> 447	from deserializer	col	array<string>	0
> 447		v2	double	1
> {code}
> The sql "alter table abc add columns(v2 double)" has write a new column v2's metadata into the mysql
> But sparksql can't read correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org