You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/17 06:58:21 UTC
spark git commit: [SPARK-21354][SQL] INPUT FILE related functions do
not support more than one sources
Repository: spark
Updated Branches:
refs/heads/master fd52a747f -> e398c2814
[SPARK-21354][SQL] INPUT FILE related functions do not support more than one sources
### What changes were proposed in this pull request?
The build-in functions `input_file_name`, `input_file_block_start`, `input_file_block_length` do not support more than one sources, like what Hive does. Currently, Spark does not block it and the outputs are ambiguous/non-deterministic. It could be from any side.
```
hive> select *, INPUT__FILE__NAME FROM t1, t2;
FAILED: SemanticException Column INPUT__FILE__NAME Found in more than One Tables/Subqueries
```
This PR blocks it and issues an error.
### How was this patch tested?
Added a test case
Author: gatorsmile <ga...@gmail.com>
Closes #18580 from gatorsmile/inputFileName.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e398c281
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e398c281
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e398c281
Branch: refs/heads/master
Commit: e398c281467d62fe3c0ffe05048ef6a2fa80285b
Parents: fd52a74
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Jul 17 14:58:14 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jul 17 14:58:14 2017 +0800
----------------------------------------------------------------------
.../spark/sql/execution/datasources/rules.scala | 38 ++++++++++++-
.../sql/internal/BaseSessionStateBuilder.scala | 1 +
.../spark/sql/ColumnExpressionSuite.scala | 57 ++++++++++++++++++++
.../sql/streaming/FileStreamSinkSuite.scala | 3 +-
.../sql/hive/HiveSessionStateBuilder.scala | 21 ++++----
5 files changed, 107 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e398c281/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 41d40aa..b97fa54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
@@ -409,6 +409,42 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
}
}
+
+/**
+ * A rule to do various checks before reading a table.
+ */
+object PreReadCheck extends (LogicalPlan => Unit) {
+ def apply(plan: LogicalPlan): Unit = {
+ plan.foreach {
+ case operator: LogicalPlan =>
+ operator transformExpressionsUp {
+ case e @ (_: InputFileName | _: InputFileBlockLength | _: InputFileBlockStart) =>
+ checkNumInputFileBlockSources(e, operator)
+ e
+ }
+ }
+ }
+
+ private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
+ operator match {
+ case _: CatalogRelation => 1
+ case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
+ case _: LeafNode => 0
+ // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
+ case u: Union =>
+ if (u.children.map(checkNumInputFileBlockSources(e, _)).sum >= 1) 1 else 0
+ case o =>
+ val numInputFileBlockSources = o.children.map(checkNumInputFileBlockSources(e, _)).sum
+ if (numInputFileBlockSources > 1) {
+ e.failAnalysis(s"'${e.prettyName}' does not support more than one sources")
+ } else {
+ numInputFileBlockSources
+ }
+ }
+ }
+}
+
+
/**
* A rule to do various checks before inserting into or writing to a data source table.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/e398c281/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 267f762..37f4f8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -168,6 +168,7 @@ abstract class BaseSessionStateBuilder(
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
+ PreReadCheck +:
HiveOnlyCheck +:
customCheckRules
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e398c281/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index bc708ca..7c45be2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -530,6 +530,63 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}
+ test("input_file_name, input_file_block_start, input_file_block_length - more than one source") {
+ withTempView("tempView1") {
+ withTable("tab1", "tab2") {
+ val data = sparkContext.parallelize(0 to 9).toDF("id")
+ data.write.saveAsTable("tab1")
+ data.write.saveAsTable("tab2")
+ data.createOrReplaceTempView("tempView1")
+ Seq("input_file_name", "input_file_block_start", "input_file_block_length").foreach { f =>
+ val e = intercept[AnalysisException] {
+ sql(s"SELECT *, $f() FROM tab1 JOIN tab2 ON tab1.id = tab2.id")
+ }.getMessage
+ assert(e.contains(s"'$f' does not support more than one source"))
+ }
+
+ def checkResult(
+ fromClause: String,
+ exceptionExpected: Boolean,
+ numExpectedRows: Int = 0): Unit = {
+ val stmt = s"SELECT *, input_file_name() FROM ($fromClause)"
+ if (exceptionExpected) {
+ val e = intercept[AnalysisException](sql(stmt)).getMessage
+ assert(e.contains("'input_file_name' does not support more than one source"))
+ } else {
+ assert(sql(stmt).count() == numExpectedRows)
+ }
+ }
+
+ checkResult(
+ "SELECT * FROM tab1 UNION ALL SELECT * FROM tab2 UNION ALL SELECT * FROM tab2",
+ exceptionExpected = false,
+ numExpectedRows = 30)
+
+ checkResult(
+ "(SELECT * FROM tempView1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2",
+ exceptionExpected = false,
+ numExpectedRows = 20)
+
+ checkResult(
+ "(SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tempView1",
+ exceptionExpected = false,
+ numExpectedRows = 20)
+
+ checkResult(
+ "(SELECT * FROM tempView1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2",
+ exceptionExpected = true)
+
+ checkResult(
+ "(SELECT * FROM tab1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2",
+ exceptionExpected = true)
+
+ checkResult(
+ "(SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2",
+ exceptionExpected = true)
+ }
+ }
+ }
+
test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
http://git-wip-us.apache.org/repos/asf/spark/blob/e398c281/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 6676099..a5cf40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -127,8 +127,7 @@ class FileStreamSinkSuite extends StreamTest {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
- case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
- baseRelation.asInstanceOf[HadoopFsRelation]
+ case LogicalRelation(baseRelation: HadoopFsRelation, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
http://git-wip-us.apache.org/repos/asf/spark/blob/e398c281/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index e16c9e4..92cb4ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -69,22 +69,23 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new ResolveHiveSerdeTable(session) +:
- new FindDataSourceTable(session) +:
- new ResolveSQLOnFile(session) +:
- customResolutionRules
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetermineTableStats(session) +:
- RelationConversions(conf, catalog) +:
- PreprocessTableCreation(session) +:
- PreprocessTableInsertion(conf) +:
- DataSourceAnalysis(conf) +:
- HiveAnalysis +:
- customPostHocResolutionRules
+ RelationConversions(conf, catalog) +:
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ HiveAnalysis +:
+ customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
- customCheckRules
+ PreReadCheck +:
+ customCheckRules
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org