You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Dan Burkert (JIRA)" <ji...@apache.org> on 2018/07/27 19:47:00 UTC

[jira] [Created] (KUDU-2518) SparkSQL queries without temporary tables

Dan Burkert created KUDU-2518:
---------------------------------

             Summary: SparkSQL queries without temporary tables
                 Key: KUDU-2518
                 URL: https://issues.apache.org/jira/browse/KUDU-2518
             Project: Kudu
          Issue Type: Improvement
          Components: hms, spark
    Affects Versions: 1.7.1
            Reporter: Dan Burkert


One long-standing ergonomic issue with the Kudu/SparkSQL integration is the requirement to register Kudu tables as temp tables before they can be scanned using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}).  Ideally SparkSQL could query Kudu tables that it discovers via the HMS with no additional configuration.  Yesterday I explored what it would take to get there, and I found some interesting things.

 

If the HMS table contains a {{spark.sql.sources.provider}} table property with a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will automatically instantiate the corresponding {{RelationProvider}} class, passing a {{SQLContext}} and a map of parameters, which it fills in with the table's HDFS URI, and storage properties.  The current plan for Kudu + HMS integration (KUDU-2191) is not to set any storage properties, instead attributes like master addresses and table ID will be stored as table properties.  As a result, SparkSQL is instantiating a Kudu {{DefaultSource}}, but it doesn't pass necessary arguments like the table name or master addresses.   Getting this far required adding a dummy {{org.apache.kudu.hive.KuduStorageHandler}} class to the classpath so that the Hive client wouldn't choke on the bogus class name.  The stacktrace from Spark attempting to instantiate the {{DefaultSource}} is provided below.

 
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1532719985143).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("DESCRIBE TABLE t1")
org.spark_project.guava.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'.  parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
  at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
  at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
  at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided
Caused by: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'.  parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None
  at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
  at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
  at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
  at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
  ... 96 more

scala>{code}
 

After striking out with the existing interfaces I looked at the {{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort underway in Spark.  It's not clear that this API actually provides more context when creating relations (we need table name and master addresses from the table properties and options are still just passed as a map in {{DataSourceOptions}}), but more significantly it doesn't appear that the {{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}} instances, it gives a class cast issue:

 
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1532720634224).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("DESCRIBE TABLE t1")
org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource is not a valid Spark SQL Data Source.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
  at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
  at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
  at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
  at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided

scala>{code}
 

{{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the classpath and added to the Hive metastore table attribute:

 
{code:java}
class KuduDataSource extends DataSourceV2
with DataSourceRegister
with ReadSupport
{
  override def shortName(): String = "kudu"

  override def createReader(options: DataSourceOptions): DataSourceReader = {
    new KuduDataSourceReader(options)
  }
}

class KuduDataSourceReader(val options: DataSourceOptions) extends DataSourceReader {

  override def readSchema(): StructType = ???

  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)