You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2018/07/27 14:03:03 UTC
spark git commit: [SPARK-23928][SQL] Add shuffle collection function.
Repository: spark
Updated Branches:
refs/heads/master 21fcac164 -> ef6c8395c
[SPARK-23928][SQL] Add shuffle collection function.
## What changes were proposed in this pull request?
This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.
## How was this patch tested?
New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.
Author: Takuya UESHIN <ue...@databricks.com>
Author: pkuwm <ih...@gmail.com>
Closes #21802 from ueshin/issues/SPARK-23928/shuffle.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef6c8395
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef6c8395
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef6c8395
Branch: refs/heads/master
Commit: ef6c8395c483954857547cbd75f84a6814317d13
Parents: 21fcac1
Author: pkuwm <ih...@gmail.com>
Authored: Fri Jul 27 23:02:48 2018 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Fri Jul 27 23:02:48 2018 +0900
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 17 +++
.../spark/sql/catalyst/analysis/Analyzer.scala | 7 +-
.../catalyst/analysis/FunctionRegistry.scala | 1 +
.../expressions/collectionOperations.scala | 106 +++++++++++++++++++
.../catalyst/util/RandomIndicesGenerator.scala | 45 ++++++++
.../CollectionExpressionsSuite.scala | 69 ++++++++++++
.../scala/org/apache/spark/sql/functions.scala | 10 ++
.../spark/sql/DataFrameFunctionsSuite.scala | 65 ++++++++++++
8 files changed, 317 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f2e6633..0a88e48 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2382,6 +2382,23 @@ def array_sort(col):
return Column(sc._jvm.functions.array_sort(_to_java_column(col)))
+@since(2.4)
+def shuffle(col):
+ """
+ Collection function: Generates a random permutation of the given array.
+
+ .. note:: The function is non-deterministic.
+
+ :param col: name of column or expression
+
+ >>> df = spark.createDataFrame([([1, 20, 3, 5],), ([1, 20, None, 3],)], ['data'])
+ >>> df.select(shuffle(df.data).alias('s')).collect() # doctest: +SKIP
+ [Row(s=[3, 1, 5, 20]), Row(s=[20, None, 3, 1])]
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.shuffle(_to_java_column(col)))
+
+
@since(1.5)
@ignore_unicode_prefix
def reverse(col):
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8e8f8e3..d18509f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -181,7 +181,7 @@ class Analyzer(
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveTimeZone(conf) ::
- ResolvedUuidExpressions ::
+ ResolveRandomSeed ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
@@ -2100,15 +2100,16 @@ class Analyzer(
}
/**
- * Set the seed for random number generation in Uuid expressions.
+ * Set the seed for random number generation.
*/
- object ResolvedUuidExpressions extends Rule[LogicalPlan] {
+ object ResolveRandomSeed extends Rule[LogicalPlan] {
private lazy val random = new Random()
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case p if p.resolved => p
case p => p transformExpressionsUp {
case Uuid(None) => Uuid(Some(random.nextLong()))
+ case Shuffle(child, None) => Shuffle(child, Some(random.nextLong()))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d696ce9..adc4837 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -429,6 +429,7 @@ object FunctionRegistry {
expression[Size]("cardinality"),
expression[ArraysZip]("arrays_zip"),
expression[SortArray]("sort_array"),
+ expression[Shuffle]("shuffle"),
expression[ArrayMin]("array_min"),
expression[ArrayMax]("array_max"),
expression[Reverse]("reverse"),
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index b3d04bf..b1d91ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -1204,6 +1204,112 @@ case class ArraySort(child: Expression) extends UnaryExpression with ArraySortLi
}
/**
+ * Returns a random permutation of the given array.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(array) - Returns a random permutation of the given array.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 20, 3, 5));
+ [3, 1, 5, 20]
+ > SELECT _FUNC_(array(1, 20, null, 3));
+ [20, null, 3, 1]
+ """,
+ note = "The function is non-deterministic.",
+ since = "2.4.0")
+case class Shuffle(child: Expression, randomSeed: Option[Long] = None)
+ extends UnaryExpression with ExpectsInputTypes with Stateful {
+
+ def this(child: Expression) = this(child, None)
+
+ override lazy val resolved: Boolean =
+ childrenResolved && checkInputDataTypes().isSuccess && randomSeed.isDefined
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+ override def dataType: DataType = child.dataType
+
+ @transient lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
+
+ @transient private[this] var random: RandomIndicesGenerator = _
+
+ override protected def initializeInternal(partitionIndex: Int): Unit = {
+ random = RandomIndicesGenerator(randomSeed.get + partitionIndex)
+ }
+
+ override protected def evalInternal(input: InternalRow): Any = {
+ val value = child.eval(input)
+ if (value == null) {
+ null
+ } else {
+ val source = value.asInstanceOf[ArrayData]
+ val numElements = source.numElements()
+ val indices = random.getNextIndices(numElements)
+ new GenericArrayData(indices.map(source.get(_, elementType)))
+ }
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ nullSafeCodeGen(ctx, ev, c => shuffleArrayCodeGen(ctx, ev, c))
+ }
+
+ private def shuffleArrayCodeGen(ctx: CodegenContext, ev: ExprCode, childName: String): String = {
+ val randomClass = classOf[RandomIndicesGenerator].getName
+
+ val rand = ctx.addMutableState(randomClass, "rand", forceInline = true)
+ ctx.addPartitionInitializationStatement(
+ s"$rand = new $randomClass(${randomSeed.get}L + partitionIndex);")
+
+ val isPrimitiveType = CodeGenerator.isPrimitiveType(elementType)
+
+ val numElements = ctx.freshName("numElements")
+ val arrayData = ctx.freshName("arrayData")
+
+ val initialization = if (isPrimitiveType) {
+ ctx.createUnsafeArray(arrayData, numElements, elementType, s" $prettyName failed.")
+ } else {
+ val arrayDataClass = classOf[GenericArrayData].getName()
+ s"$arrayDataClass $arrayData = new $arrayDataClass(new Object[$numElements]);"
+ }
+
+ val indices = ctx.freshName("indices")
+ val i = ctx.freshName("i")
+
+ val getValue = CodeGenerator.getValue(childName, elementType, s"$indices[$i]")
+
+ val setFunc = if (isPrimitiveType) {
+ s"set${CodeGenerator.primitiveTypeName(elementType)}"
+ } else {
+ "update"
+ }
+
+ val assignment = if (isPrimitiveType && dataType.asInstanceOf[ArrayType].containsNull) {
+ s"""
+ |if ($childName.isNullAt($indices[$i])) {
+ | $arrayData.setNullAt($i);
+ |} else {
+ | $arrayData.$setFunc($i, $getValue);
+ |}
+ """.stripMargin
+ } else {
+ s"$arrayData.$setFunc($i, $getValue);"
+ }
+
+ s"""
+ |int $numElements = $childName.numElements();
+ |int[] $indices = $rand.getNextIndices($numElements);
+ |$initialization
+ |for (int $i = 0; $i < $numElements; $i++) {
+ | $assignment
+ |}
+ |${ev.value} = $arrayData;
+ """.stripMargin
+ }
+
+ override def freshCopy(): Shuffle = Shuffle(child, randomSeed)
+}
+
+/**
* Returns a reversed string or an array with reverse order of elements.
*/
@ExpressionDescription(
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
new file mode 100644
index 0000000..ae05128
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.commons.math3.random.MersenneTwister
+
+/**
+ * This class is used to generate a random indices of given length.
+ *
+ * This implementation uses the "inside-out" version of Fisher-Yates algorithm.
+ * Reference:
+ * https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_%22inside-out%22_algorithm
+ */
+case class RandomIndicesGenerator(randomSeed: Long) {
+ private val random = new MersenneTwister(randomSeed)
+
+ def getNextIndices(length: Int): Array[Int] = {
+ val indices = new Array[Int](length)
+ var i = 0
+ while (i < length) {
+ val j = random.nextInt(i + 1)
+ if (j != i) {
+ indices(i) = indices(j)
+ }
+ indices(j) = i
+ i += 1
+ }
+ indices
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index c7f0da7..5c57285 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
import java.util.TimeZone
+import scala.util.Random
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
@@ -1434,4 +1436,71 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
assert(ArrayUnion(a20, a21).dataType.asInstanceOf[ArrayType].containsNull === false)
assert(ArrayUnion(a20, a22).dataType.asInstanceOf[ArrayType].containsNull === true)
}
+
+ test("Shuffle") {
+ // Primitive-type elements
+ val ai0 = Literal.create(Seq(1, 2, 3, 4, 5), ArrayType(IntegerType, containsNull = false))
+ val ai1 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, containsNull = false))
+ val ai2 = Literal.create(Seq(null, 1, null, 3), ArrayType(IntegerType, containsNull = true))
+ val ai3 = Literal.create(Seq(2, null, 4, null), ArrayType(IntegerType, containsNull = true))
+ val ai4 = Literal.create(Seq(null, null, null), ArrayType(IntegerType, containsNull = true))
+ val ai5 = Literal.create(Seq(1), ArrayType(IntegerType, containsNull = false))
+ val ai6 = Literal.create(Seq.empty, ArrayType(IntegerType, containsNull = false))
+ val ai7 = Literal.create(null, ArrayType(IntegerType, containsNull = true))
+
+ checkEvaluation(Shuffle(ai0, Some(0)), Seq(4, 1, 2, 3, 5))
+ checkEvaluation(Shuffle(ai1, Some(0)), Seq(3, 1, 2))
+ checkEvaluation(Shuffle(ai2, Some(0)), Seq(3, null, 1, null))
+ checkEvaluation(Shuffle(ai3, Some(0)), Seq(null, 2, null, 4))
+ checkEvaluation(Shuffle(ai4, Some(0)), Seq(null, null, null))
+ checkEvaluation(Shuffle(ai5, Some(0)), Seq(1))
+ checkEvaluation(Shuffle(ai6, Some(0)), Seq.empty)
+ checkEvaluation(Shuffle(ai7, Some(0)), null)
+
+ // Non-primitive-type elements
+ val as0 = Literal.create(Seq("a", "b", "c", "d"), ArrayType(StringType, containsNull = false))
+ val as1 = Literal.create(Seq("a", "b", "c"), ArrayType(StringType, containsNull = false))
+ val as2 = Literal.create(Seq(null, "a", null, "c"), ArrayType(StringType, containsNull = true))
+ val as3 = Literal.create(Seq("b", null, "d", null), ArrayType(StringType, containsNull = true))
+ val as4 = Literal.create(Seq(null, null, null), ArrayType(StringType, containsNull = true))
+ val as5 = Literal.create(Seq("a"), ArrayType(StringType, containsNull = false))
+ val as6 = Literal.create(Seq.empty, ArrayType(StringType, containsNull = false))
+ val as7 = Literal.create(null, ArrayType(StringType, containsNull = true))
+ val aa = Literal.create(
+ Seq(Seq("a", "b"), Seq("c", "d"), Seq("e")),
+ ArrayType(ArrayType(StringType)))
+
+ checkEvaluation(Shuffle(as0, Some(0)), Seq("d", "a", "b", "c"))
+ checkEvaluation(Shuffle(as1, Some(0)), Seq("c", "a", "b"))
+ checkEvaluation(Shuffle(as2, Some(0)), Seq("c", null, "a", null))
+ checkEvaluation(Shuffle(as3, Some(0)), Seq(null, "b", null, "d"))
+ checkEvaluation(Shuffle(as4, Some(0)), Seq(null, null, null))
+ checkEvaluation(Shuffle(as5, Some(0)), Seq("a"))
+ checkEvaluation(Shuffle(as6, Some(0)), Seq.empty)
+ checkEvaluation(Shuffle(as7, Some(0)), null)
+ checkEvaluation(Shuffle(aa, Some(0)), Seq(Seq("e"), Seq("a", "b"), Seq("c", "d")))
+
+ val r = new Random()
+ val seed1 = Some(r.nextLong())
+ assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) ===
+ evaluateWithoutCodegen(Shuffle(ai0, seed1)))
+ assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) ===
+ evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)))
+ assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) ===
+ evaluateWithUnsafeProjection(Shuffle(ai0, seed1)))
+
+ val seed2 = Some(r.nextLong())
+ assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !==
+ evaluateWithoutCodegen(Shuffle(ai0, seed2)))
+ assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !==
+ evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2)))
+ assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
+ evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
+
+ val shuffle = Shuffle(ai0, seed1)
+ assert(shuffle.fastEquals(shuffle))
+ assert(!shuffle.fastEquals(Shuffle(ai0, seed1)))
+ assert(!shuffle.fastEquals(shuffle.freshCopy()))
+ assert(!shuffle.fastEquals(Shuffle(ai0, seed2)))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index de1d422..bcd0c94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3546,6 +3546,16 @@ object functions {
def array_max(e: Column): Column = withExpr { ArrayMax(e.expr) }
/**
+ * Returns a random permutation of the given array.
+ *
+ * @note The function is non-deterministic.
+ *
+ * @group collection_funcs
+ * @since 2.4.0
+ */
+ def shuffle(e: Column): Column = withExpr { Shuffle(e.expr) }
+
+ /**
* Returns a reversed string or an array with reverse order of elements.
* @group collection_funcs
* @since 1.5.0
http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 5a7bd45..299c96f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -1513,6 +1513,71 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
)
}
+ // Shuffle expressions should produce same results at retries in the same DataFrame.
+ private def checkShuffleResult(df: DataFrame): Unit = {
+ checkAnswer(df, df.collect())
+ }
+
+ test("shuffle function - array for primitive type not containing null") {
+ val idfNotContainsNull = Seq(
+ Seq(1, 9, 8, 7),
+ Seq(5, 8, 9, 7, 2),
+ Seq.empty,
+ null
+ ).toDF("i")
+
+ def testArrayOfPrimitiveTypeNotContainsNull(): Unit = {
+ checkShuffleResult(idfNotContainsNull.select(shuffle('i)))
+ checkShuffleResult(idfNotContainsNull.selectExpr("shuffle(i)"))
+ }
+
+ // Test with local relation, the Project will be evaluated without codegen
+ testArrayOfPrimitiveTypeNotContainsNull()
+ // Test with cached relation, the Project will be evaluated with codegen
+ idfNotContainsNull.cache()
+ testArrayOfPrimitiveTypeNotContainsNull()
+ }
+
+ test("shuffle function - array for primitive type containing null") {
+ val idfContainsNull = Seq[Seq[Integer]](
+ Seq(1, 9, 8, null, 7),
+ Seq(null, 5, 8, 9, 7, 2),
+ Seq.empty,
+ null
+ ).toDF("i")
+
+ def testArrayOfPrimitiveTypeContainsNull(): Unit = {
+ checkShuffleResult(idfContainsNull.select(shuffle('i)))
+ checkShuffleResult(idfContainsNull.selectExpr("shuffle(i)"))
+ }
+
+ // Test with local relation, the Project will be evaluated without codegen
+ testArrayOfPrimitiveTypeContainsNull()
+ // Test with cached relation, the Project will be evaluated with codegen
+ idfContainsNull.cache()
+ testArrayOfPrimitiveTypeContainsNull()
+ }
+
+ test("shuffle function - array for non-primitive type") {
+ val sdf = Seq(
+ Seq("c", "a", "b"),
+ Seq("b", null, "c", null),
+ Seq.empty,
+ null
+ ).toDF("s")
+
+ def testNonPrimitiveType(): Unit = {
+ checkShuffleResult(sdf.select(shuffle('s)))
+ checkShuffleResult(sdf.selectExpr("shuffle(s)"))
+ }
+
+ // Test with local relation, the Project will be evaluated without codegen
+ testNonPrimitiveType()
+ // Test with cached relation, the Project will be evaluated with codegen
+ sdf.cache()
+ testNonPrimitiveType()
+ }
+
private def assertValuesDoNotChangeAfterCoalesceOrUnion(v: Column): Unit = {
import DataFrameFunctionsSuite.CodegenFallbackExpr
for ((codegenFallback, wholeStage) <- Seq((true, false), (false, false), (false, true))) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org