You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:36:03 UTC
[jira] [Resolved] (SPARK-16517) can't add columns on the table
create by spark'writer
[ https://issues.apache.org/jira/browse/SPARK-16517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-16517.
----------------------------------
Resolution: Incomplete
> can't add columns on the table create by spark'writer
> ------------------------------------------------------
>
> Key: SPARK-16517
> URL: https://issues.apache.org/jira/browse/SPARK-16517
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.2
> Reporter: lichenglin
> Priority: Major
> Labels: bulk-closed
>
> {code}
> setName("abc");
> HiveContext hive = getHiveContext();
> DataFrame d = hive.createDataFrame(
> getJavaSparkContext().parallelize(
> Arrays.asList(RowFactory.create("abc", "abc", 5.0), RowFactory.create("abcd", "abcd", 5.0))),
> DataTypes.createStructType(
> Arrays.asList(DataTypes.createStructField("card_id", DataTypes.StringType, true),
> DataTypes.createStructField("tag_name", DataTypes.StringType, true),
> DataTypes.createStructField("v", DataTypes.DoubleType, true))));
> d.write().partitionBy("v").mode(SaveMode.Overwrite).saveAsTable("abc");
> hive.sql("alter table abc add columns(v2 double)");
> hive.refreshTable("abc");
> hive.sql("describe abc").show();
> DataFrame d2 = hive.createDataFrame(
> getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("abc", "abc", 3.0, 4.0),
> RowFactory.create("abcd", "abcd", 3.0, 1.0))),
> new StructType(new StructField[] { DataTypes.createStructField("card_id", DataTypes.StringType, true),
> DataTypes.createStructField("tag_name", DataTypes.StringType, true),
> DataTypes.createStructField("v", DataTypes.DoubleType, true),
> DataTypes.createStructField("v2", DataTypes.DoubleType, true) }));
> d2.write().partitionBy("v").mode(SaveMode.Append).saveAsTable("abc");
> hive.table("abc").show();
> {code}
> spark.sql.parquet.mergeSchema has been set to "true".
> The code's exception is here
> {code}
> +--------+---------+-------+
> |col_name|data_type|comment|
> +--------+---------+-------+
> | card_id| string| |
> |tag_name| string| |
> | v| double| |
> +--------+---------+-------+
> 2016-07-13 13:40:43,637 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : db=default tbl=abc
> 2016-07-13 13:40:43,637 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl ip=unknown-ip-addr cmd=get_table : db=default tbl=abc
> 2016-07-13 13:40:43,693 INFO [org.apache.spark.storage.BlockManagerInfo:58] - Removed broadcast_2_piece0 on localhost:50647 in memory (size: 1176.0 B, free: 1125.7 MB)
> 2016-07-13 13:40:43,700 INFO [org.apache.spark.ContextCleaner:58] - Cleaned accumulator 2
> 2016-07-13 13:40:43,702 INFO [org.apache.spark.storage.BlockManagerInfo:58] - Removed broadcast_1_piece0 on localhost:50647 in memory (size: 19.4 KB, free: 1125.7 MB)
> 2016-07-13 13:40:43,702 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : db=default tbl=abc
> 2016-07-13 13:40:43,703 INFO [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl ip=unknown-ip-addr cmd=get_table : db=default tbl=abc
> Exception in thread "main" java.lang.RuntimeException: Relation[card_id#26,tag_name#27,v#28] ParquetRelation
> requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:68)
> at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:58)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
> at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:58)
> at org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:57)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
> at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
> at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
> at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
> at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
> at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
> at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
> at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
> at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
> at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
> at CubeDemoTest.main(CubeDemoTest.java:52)
> {code}
> the metadata is store in mysql,here is the columns_v2's data on table abc
> {code}
> 447 from deserializer col array<string> 0
> 447 v2 double 1
> {code}
> The sql "alter table abc add columns(v2 double)" has write a new column v2's metadata into the mysql
> But sparksql can't read correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org