You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/02/23 09:13:27 UTC

spark git commit: [SPARK-13263][SQL] SQL Generation Support for Tablesample

Repository: spark
Updated Branches:
  refs/heads/master 5cd3e6f60 -> 87250580f


[SPARK-13263][SQL] SQL Generation Support for Tablesample

In the parser, tableSample clause is part of tableSource.
```
tableSource
init { gParent.pushMsg("table source", state); }
after { gParent.popMsg(state); }
    : tabname=tableName
    ((tableProperties) => props=tableProperties)?
    ((tableSample) => ts=tableSample)?
    ((KW_AS) => (KW_AS alias=Identifier)
    |
    (Identifier) => (alias=Identifier))?
    -> ^(TOK_TABREF $tabname $props? $ts? $alias?)
    ;
```

Two typical query samples using TABLESAMPLE are:
```
    "SELECT s.id FROM t0 TABLESAMPLE(10 PERCENT) s"
    "SELECT * FROM t0 TABLESAMPLE(0.1 PERCENT)"
```

FYI, the logical plan of a TABLESAMPLE query:
```
sql("SELECT * FROM t0 TABLESAMPLE(0.1 PERCENT)").explain(true)

== Analyzed Logical Plan ==
id: bigint
Project [id#16L]
+- Sample 0.0, 0.001, false, 381
   +- Subquery t0
      +- Relation[id#16L] ParquetRelation
```

Thanks! cc liancheng

Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>

This patch had conflicts when merged, resolved by
Committer: Cheng Lian <li...@databricks.com>

Closes #11148 from gatorsmile/tablesplitsample.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87250580
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87250580
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87250580

Branch: refs/heads/master
Commit: 87250580f214cb7c4dff01c5a3498ea6cb79a27e
Parents: 5cd3e6f
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Feb 23 16:13:09 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Tue Feb 23 16:13:09 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/CatalystQl.scala  |  6 +-
 .../sql/catalyst/optimizer/Optimizer.scala      |  2 +-
 .../catalyst/plans/logical/basicOperators.scala |  5 +-
 .../optimizer/FilterPushdownSuite.scala         |  4 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  4 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 18 ++++
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  | 98 ++++++++++++++------
 8 files changed, 103 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
index 069c665..a0a56d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
@@ -499,12 +499,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
               s"Sampling fraction ($fraction) must be on interval [0, 100]")
             Sample(0.0, fraction.toDouble / 100, withReplacement = false,
               (math.random * 1000).toInt,
-              relation)
+              relation)(
+              isTableSample = true)
           case Token("TOK_TABLEBUCKETSAMPLE",
           Token(numerator, Nil) ::
             Token(denominator, Nil) :: Nil) =>
             val fraction = numerator.toDouble / denominator.toDouble
-            Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, relation)
+            Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, relation)(
+              isTableSample = true)
           case a =>
             noParseRule("Sampling", a)
         }.getOrElse(relation)

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1554382..1f05f20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -123,7 +123,7 @@ object SamplePushDown extends Rule[LogicalPlan] {
     // Push down projection into sample
     case Project(projectList, s @ Sample(lb, up, replace, seed, child)) =>
       Sample(lb, up, replace, seed,
-        Project(projectList, child))
+        Project(projectList, child))()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 70ecbce..c98d33d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -637,15 +637,18 @@ case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
  * @param withReplacement Whether to sample with replacement.
  * @param seed the random seed
  * @param child the LogicalPlan
+ * @param isTableSample Is created from TABLESAMPLE in the parser.
  */
 case class Sample(
     lowerBound: Double,
     upperBound: Double,
     withReplacement: Boolean,
     seed: Long,
-    child: LogicalPlan) extends UnaryNode {
+    child: LogicalPlan)(
+    val isTableSample: java.lang.Boolean = false) extends UnaryNode {
 
   override def output: Seq[Attribute] = child.output
+  override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7805723..70b34cb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -640,7 +640,7 @@ class FilterPushdownSuite extends PlanTest {
   test("push project and filter down into sample") {
     val x = testRelation.subquery('x)
     val originalQuery =
-      Sample(0.0, 0.6, false, 11L, x).select('a)
+      Sample(0.0, 0.6, false, 11L, x)().select('a)
 
     val originalQueryAnalyzed =
       EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery))
@@ -648,7 +648,7 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQueryAnalyzed)
 
     val correctAnswer =
-      Sample(0.0, 0.6, false, 11L, x.select('a))
+      Sample(0.0, 0.6, false, 11L, x.select('a))()
 
     comparePlans(optimized, correctAnswer.analyze)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index e3412f7..f590ac0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1041,7 +1041,7 @@ class DataFrame private[sql](
    * @since 1.3.0
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame = withPlan {
-    Sample(0.0, fraction, withReplacement, seed, logicalPlan)
+    Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
   }
 
   /**
@@ -1073,7 +1073,7 @@ class DataFrame private[sql](
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
     normalizedCumWeights.sliding(2).map { x =>
-      new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
+      new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)())
     }.toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ea7e725..dd1fbcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -564,7 +564,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long) : Dataset[T] =
-    withPlan(Sample(0.0, fraction, withReplacement, seed, _))
+    withPlan(Sample(0.0, fraction, withReplacement, seed, _)())
 
   /**
    * Returns a new [[Dataset]] by sampling a fraction of records, using a random seed.

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index e66cc12..5182af9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -91,6 +91,23 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     case Limit(limitExpr, child) =>
       s"${toSQL(child)} LIMIT ${limitExpr.sql}"
 
+    case p: Sample if p.isTableSample =>
+      val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100))
+      p.child match {
+        case m: MetastoreRelation =>
+          val aliasName = m.alias.getOrElse("")
+          build(
+            s"`${m.databaseName}`.`${m.tableName}`",
+            "TABLESAMPLE(" + fraction + " PERCENT)",
+            aliasName)
+        case s: SubqueryAlias =>
+          val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias else ""
+          val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s
+          build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName)
+        case _ =>
+          build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
+      }
+
     case p: Filter =>
       val whereOrHaving = p.child match {
         case _: Aggregate => "HAVING"
@@ -232,6 +249,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
             | OneRowRelation
             | _: LocalLimit
             | _: GlobalLimit
+            | _: Sample
         ) => plan
 
         case plan: Project =>

http://git-wip-us.apache.org/repos/asf/spark/blob/87250580/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 28559ea..fa78f5a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -26,25 +26,32 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   import testImplicits._
 
   protected override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS parquet_t0")
+    sql("DROP TABLE IF EXISTS parquet_t1")
+    sql("DROP TABLE IF EXISTS parquet_t2")
     sql("DROP TABLE IF EXISTS t0")
-    sql("DROP TABLE IF EXISTS t1")
-    sql("DROP TABLE IF EXISTS t2")
 
-    sqlContext.range(10).write.saveAsTable("t0")
+    sqlContext.range(10).write.saveAsTable("parquet_t0")
+    sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0")
 
     sqlContext
       .range(10)
       .select('id as 'key, concat(lit("val_"), 'id) as 'value)
       .write
-      .saveAsTable("t1")
+      .saveAsTable("parquet_t1")
 
-    sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2")
+    sqlContext
+      .range(10)
+      .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+      .write
+      .saveAsTable("parquet_t2")
   }
 
   override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS parquet_t0")
+    sql("DROP TABLE IF EXISTS parquet_t1")
+    sql("DROP TABLE IF EXISTS parquet_t2")
     sql("DROP TABLE IF EXISTS t0")
-    sql("DROP TABLE IF EXISTS t1")
-    sql("DROP TABLE IF EXISTS t2")
   }
 
   private def checkHiveQl(hiveQl: String): Unit = {
@@ -83,7 +90,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   }
 
   test("in") {
-    checkHiveQl("SELECT id FROM t0 WHERE id IN (1, 2, 3)")
+    checkHiveQl("SELECT id FROM parquet_t0 WHERE id IN (1, 2, 3)")
   }
 
   test("not in") {
@@ -95,11 +102,11 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   }
 
   test("aggregate function in having clause") {
-    checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key HAVING MAX(key) > 0")
+    checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key HAVING MAX(key) > 0")
   }
 
   test("aggregate function in order by clause") {
-    checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY MAX(key)")
+    checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY MAX(key)")
   }
 
   // When there are multiple aggregate functions in ORDER BY clause, all of them are extracted into
@@ -107,11 +114,11 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   // execution since these aliases have different expression ID.  But this introduces name collision
   // when converting resolved plans back to SQL query strings as expression IDs are stripped.
   test("aggregate function in order by clause with multiple order keys") {
-    checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY key, MAX(key)")
+    checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key, MAX(key)")
   }
 
   test("type widening in union") {
-    checkHiveQl("SELECT id FROM t0 UNION ALL SELECT CAST(id AS INT) AS id FROM t0")
+    checkHiveQl("SELECT id FROM parquet_t0 UNION ALL SELECT CAST(id AS INT) AS id FROM parquet_t0")
   }
 
   test("union distinct") {
@@ -124,9 +131,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   // UNION ALL (SELECT  `t0`.`id` FROM `default`.`t0`))
   // UNION ALL (SELECT  `t0`.`id` FROM `default`.`t0`)) AS u_1
   test("three-child union") {
-    checkHiveQl("SELECT id FROM t0 UNION ALL SELECT id FROM t0 UNION ALL SELECT id FROM t0")
+    checkHiveQl(
+      """
+        |SELECT id FROM parquet_t0
+        |UNION ALL SELECT id FROM parquet_t0
+        |UNION ALL SELECT id FROM parquet_t0
+      """.stripMargin)
   }
 
+
   test("intersect") {
     checkHiveQl("SELECT * FROM t0 INTERSECT SELECT * FROM t0")
   }
@@ -136,59 +149,90 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   }
 
   test("self join") {
-    checkHiveQl("SELECT x.key FROM t1 x JOIN t1 y ON x.key = y.key")
+    checkHiveQl("SELECT x.key FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key")
   }
 
   test("self join with group by") {
-    checkHiveQl("SELECT x.key, COUNT(*) FROM t1 x JOIN t1 y ON x.key = y.key group by x.key")
+    checkHiveQl(
+      "SELECT x.key, COUNT(*) FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key group by x.key")
   }
 
-
   test("case") {
-    checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 WHEN id % 2 = 0 THEN 1 END FROM t0")
+    checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 WHEN id % 2 = 0 THEN 1 END FROM parquet_t0")
   }
 
   test("case with else") {
-    checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 ELSE 1 END FROM t0")
+    checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 ELSE 1 END FROM parquet_t0")
   }
 
   test("case with key") {
-    checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' END FROM t0")
+    checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' END FROM parquet_t0")
   }
 
   test("case with key and else") {
-    checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' ELSE 'baz' END FROM t0")
+    checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' ELSE 'baz' END FROM parquet_t0")
   }
 
   test("select distinct without aggregate functions") {
-    checkHiveQl("SELECT DISTINCT id FROM t0")
+    checkHiveQl("SELECT DISTINCT id FROM parquet_t0")
   }
 
   test("cluster by") {
-    checkHiveQl("SELECT id FROM t0 CLUSTER BY id")
+    checkHiveQl("SELECT id FROM parquet_t0 CLUSTER BY id")
   }
 
   test("distribute by") {
-    checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id")
+    checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id")
   }
 
   test("distribute by with sort by") {
-    checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id SORT BY id")
+    checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id SORT BY id")
   }
 
   test("distinct aggregation") {
-    checkHiveQl("SELECT COUNT(DISTINCT id) FROM t0")
+    checkHiveQl("SELECT COUNT(DISTINCT id) FROM parquet_t0")
+  }
+
+  test("TABLESAMPLE") {
+    // Project [id#2L]
+    // +- Sample 0.0, 1.0, false, ...
+    //    +- Subquery s
+    //       +- Subquery parquet_t0
+    //          +- Relation[id#2L] ParquetRelation
+    checkHiveQl("SELECT s.id FROM parquet_t0 TABLESAMPLE(100 PERCENT) s")
+
+    // Project [id#2L]
+    // +- Sample 0.0, 1.0, false, ...
+    //    +- Subquery parquet_t0
+    //       +- Relation[id#2L] ParquetRelation
+    checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(100 PERCENT)")
+
+    // Project [id#21L]
+    // +- Sample 0.0, 1.0, false, ...
+    //    +- MetastoreRelation default, t0, Some(s)
+    checkHiveQl("SELECT s.id FROM t0 TABLESAMPLE(100 PERCENT) s")
+
+    // Project [id#24L]
+    // +- Sample 0.0, 1.0, false, ...
+    //    +- MetastoreRelation default, t0, None
+    checkHiveQl("SELECT * FROM t0 TABLESAMPLE(100 PERCENT)")
+
+    // When a sampling fraction is not 100%, the returned results are random.
+    // Thus, added an always-false filter here to check if the generated plan can be successfully
+    // executed.
+    checkHiveQl("SELECT s.id FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) s WHERE 1=0")
+    checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0")
   }
 
   // TODO Enable this
   // Query plans transformed by DistinctAggregationRewriter are not recognized yet
   ignore("multi-distinct columns") {
-    checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
+    checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a")
   }
 
   test("persisted data source relations") {
     Seq("orc", "json", "parquet").foreach { format =>
-      val tableName = s"${format}_t0"
+      val tableName = s"${format}_parquet_t0"
       withTable(tableName) {
         sqlContext.range(10).write.format(format).saveAsTable(tableName)
         checkHiveQl(s"SELECT id FROM $tableName")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org