You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/04 11:51:14 UTC

spark git commit: [SPARK-22423][SQL] Scala test source files like TestHiveSingleton.scala should be in scala source root

Repository: spark
Updated Branches:
  refs/heads/master e7adb7d7a -> 7a8412352


[SPARK-22423][SQL] Scala test source files like TestHiveSingleton.scala should be in scala source root

## What changes were proposed in this pull request?

  Scala test source files like TestHiveSingleton.scala should be in scala source root

## How was this patch tested?

Just move scala file from java directory to scala directory
No new test case in this PR.

```
	renamed:    mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala -> mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
	renamed:    streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala -> streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
	renamed:    streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala -> streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
	renamed:   sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala  sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
```

Author: xubo245 <60...@qq.com>

Closes #19639 from xubo245/scalaDirectory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a841235
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a841235
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a841235

Branch: refs/heads/master
Commit: 7a8412352e3aaf14527f97c82d0d62f9de39e753
Parents: e7adb7d
Author: xubo245 <60...@qq.com>
Authored: Sat Nov 4 11:51:10 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Nov 4 11:51:10 2017 +0000

----------------------------------------------------------------------
 .../spark/ml/util/IdentifiableSuite.scala       |  41 ---
 .../spark/ml/util/IdentifiableSuite.scala       |  41 +++
 .../spark/sql/hive/test/TestHiveSingleton.scala |  42 ---
 .../spark/sql/hive/test/TestHiveSingleton.scala |  42 +++
 .../apache/spark/streaming/JavaTestUtils.scala  |  99 ------
 .../JavaStreamingListenerWrapperSuite.scala     | 303 -------------------
 .../apache/spark/streaming/JavaTestUtils.scala  |  99 ++++++
 .../JavaStreamingListenerWrapperSuite.scala     | 303 +++++++++++++++++++
 8 files changed, 485 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala b/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
deleted file mode 100644
index 878bc66..0000000
--- a/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ml.util
-
-import org.apache.spark.SparkFunSuite
-
-class IdentifiableSuite extends SparkFunSuite {
-
-  import IdentifiableSuite.Test
-
-  test("Identifiable") {
-    val test0 = new Test("test_0")
-    assert(test0.uid === "test_0")
-
-    val test1 = new Test
-    assert(test1.uid.startsWith("test_"))
-  }
-}
-
-object IdentifiableSuite {
-
-  class Test(override val uid: String) extends Identifiable {
-    def this() = this(Identifiable.randomUID("test"))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
new file mode 100644
index 0000000..878bc66
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.util
+
+import org.apache.spark.SparkFunSuite
+
+class IdentifiableSuite extends SparkFunSuite {
+
+  import IdentifiableSuite.Test
+
+  test("Identifiable") {
+    val test0 = new Test("test_0")
+    assert(test0.uid === "test_0")
+
+    val test1 = new Test
+    assert(test1.uid.startsWith("test_"))
+  }
+}
+
+object IdentifiableSuite {
+
+  class Test(override val uid: String) extends Identifiable {
+    def this() = this(Identifiable.randomUID("test"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
deleted file mode 100644
index df7988f..0000000
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.test
-
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.client.HiveClient
-
-
-trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
-  protected val spark: SparkSession = TestHive.sparkSession
-  protected val hiveContext: TestHiveContext = TestHive
-  protected val hiveClient: HiveClient =
-    spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-
-  protected override def afterAll(): Unit = {
-    try {
-      hiveContext.reset()
-    } finally {
-      super.afterAll()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
new file mode 100644
index 0000000..df7988f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.test
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
+  protected val spark: SparkSession = TestHive.sparkSession
+  protected val hiveContext: TestHiveContext = TestHive
+  protected val hiveClient: HiveClient =
+    spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+  protected override def afterAll(): Unit = {
+    try {
+      hiveContext.reset()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
deleted file mode 100644
index 0c4a64c..0000000
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.api.java.JavaRDDLike
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaDStreamLike, JavaStreamingContext}
-
-/** Exposes streaming test functionality in a Java-friendly way. */
-trait JavaTestBase extends TestSuiteBase {
-
-  /**
-   * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
-   * The stream will be derived from the supplied lists of Java objects.
-   */
-  def attachTestInputStream[T](
-      ssc: JavaStreamingContext,
-      data: JList[JList[T]],
-      numPartitions: Int): JavaDStream[T] = {
-    val seqData = data.asScala.map(_.asScala)
-
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
-    new JavaDStream[T](dstream)
-  }
-
-  /**
-   * Attach a provided stream to it's associated StreamingContext as a
-   * [[org.apache.spark.streaming.TestOutputStream]].
-   */
-  def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
-      dstream: JavaDStreamLike[T, This, R]): Unit = {
-    implicit val cm: ClassTag[T] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
-    ostream.register()
-  }
-
-  /**
-   * Process all registered streams for a numBatches batches, failing if
-   * numExpectedOutput RDD's are not generated. Generated RDD's are collected
-   * and returned, represented as a list for each batch interval.
-   *
-   * Returns a list of items for each RDD.
-   */
-  def runStreams[V](
-      ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
-    implicit val cm: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-    ssc.getState()
-    val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
-    res.map(_.asJava).toSeq.asJava
-  }
-
-  /**
-   * Process all registered streams for a numBatches batches, failing if
-   * numExpectedOutput RDD's are not generated. Generated RDD's are collected
-   * and returned, represented as a list for each batch interval.
-   *
-   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
-   * representing one partition.
-   */
-  def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
-      numExpectedOutput: Int): JList[JList[JList[V]]] = {
-    implicit val cm: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-    val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
-    res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
-  }
-}
-
-object JavaTestUtils extends JavaTestBase {
-  override def maxWaitTimeMillis: Int = 20000
-
-}
-
-object JavaCheckpointTestUtils extends JavaTestBase {
-  override def actuallyWait: Boolean = true
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
deleted file mode 100644
index cfd4323..0000000
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.api.java
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler._
-
-class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
-
-  test("basic") {
-    val listener = new TestJavaStreamingListener()
-    val listenerWrapper = new JavaStreamingListenerWrapper(listener)
-
-    val streamingStarted = StreamingListenerStreamingStarted(1000L)
-    listenerWrapper.onStreamingStarted(streamingStarted)
-    assert(listener.streamingStarted.time === streamingStarted.time)
-
-    val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
-      streamId = 2,
-      name = "test",
-      active = true,
-      location = "localhost",
-      executorId = "1"
-    ))
-    listenerWrapper.onReceiverStarted(receiverStarted)
-    assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
-
-    val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
-      streamId = 2,
-      name = "test",
-      active = false,
-      location = "localhost",
-      executorId = "1"
-    ))
-    listenerWrapper.onReceiverStopped(receiverStopped)
-    assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
-
-    val receiverError = StreamingListenerReceiverError(ReceiverInfo(
-      streamId = 2,
-      name = "test",
-      active = false,
-      location = "localhost",
-      executorId = "1",
-      lastErrorMessage = "failed",
-      lastError = "failed",
-      lastErrorTime = System.currentTimeMillis()
-    ))
-    listenerWrapper.onReceiverError(receiverError)
-    assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
-
-    val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
-      batchTime = Time(1000L),
-      streamIdToInputInfo = Map(
-        0 -> StreamInputInfo(
-          inputStreamId = 0,
-          numRecords = 1000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
-        1 -> StreamInputInfo(
-          inputStreamId = 1,
-          numRecords = 2000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
-      submissionTime = 1001L,
-      None,
-      None,
-      outputOperationInfos = Map(
-        0 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 0,
-          name = "op1",
-          description = "operation1",
-          startTime = None,
-          endTime = None,
-          failureReason = None),
-        1 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 1,
-          name = "op2",
-          description = "operation2",
-          startTime = None,
-          endTime = None,
-          failureReason = None))
-    ))
-    listenerWrapper.onBatchSubmitted(batchSubmitted)
-    assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
-
-    val batchStarted = StreamingListenerBatchStarted(BatchInfo(
-      batchTime = Time(1000L),
-      streamIdToInputInfo = Map(
-        0 -> StreamInputInfo(
-          inputStreamId = 0,
-          numRecords = 1000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
-        1 -> StreamInputInfo(
-          inputStreamId = 1,
-          numRecords = 2000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
-      submissionTime = 1001L,
-      Some(1002L),
-      None,
-      outputOperationInfos = Map(
-        0 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 0,
-          name = "op1",
-          description = "operation1",
-          startTime = Some(1003L),
-          endTime = None,
-          failureReason = None),
-        1 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 1,
-          name = "op2",
-          description = "operation2",
-          startTime = Some(1005L),
-          endTime = None,
-          failureReason = None))
-    ))
-    listenerWrapper.onBatchStarted(batchStarted)
-    assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
-
-    val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
-      batchTime = Time(1000L),
-      streamIdToInputInfo = Map(
-        0 -> StreamInputInfo(
-          inputStreamId = 0,
-          numRecords = 1000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
-        1 -> StreamInputInfo(
-          inputStreamId = 1,
-          numRecords = 2000,
-          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
-      submissionTime = 1001L,
-      Some(1002L),
-      Some(1010L),
-      outputOperationInfos = Map(
-        0 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 0,
-          name = "op1",
-          description = "operation1",
-          startTime = Some(1003L),
-          endTime = Some(1004L),
-          failureReason = None),
-        1 -> OutputOperationInfo(
-          batchTime = Time(1000L),
-          id = 1,
-          name = "op2",
-          description = "operation2",
-          startTime = Some(1005L),
-          endTime = Some(1010L),
-          failureReason = None))
-    ))
-    listenerWrapper.onBatchCompleted(batchCompleted)
-    assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
-
-    val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
-      batchTime = Time(1000L),
-      id = 0,
-      name = "op1",
-      description = "operation1",
-      startTime = Some(1003L),
-      endTime = None,
-      failureReason = None
-    ))
-    listenerWrapper.onOutputOperationStarted(outputOperationStarted)
-    assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
-      outputOperationStarted.outputOperationInfo)
-
-    val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
-      batchTime = Time(1000L),
-      id = 0,
-      name = "op1",
-      description = "operation1",
-      startTime = Some(1003L),
-      endTime = Some(1004L),
-      failureReason = None
-    ))
-    listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
-    assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
-      outputOperationCompleted.outputOperationInfo)
-  }
-
-  private def assertReceiverInfo(
-      javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
-    assert(javaReceiverInfo.streamId === receiverInfo.streamId)
-    assert(javaReceiverInfo.name === receiverInfo.name)
-    assert(javaReceiverInfo.active === receiverInfo.active)
-    assert(javaReceiverInfo.location === receiverInfo.location)
-    assert(javaReceiverInfo.executorId === receiverInfo.executorId)
-    assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
-    assert(javaReceiverInfo.lastError === receiverInfo.lastError)
-    assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
-  }
-
-  private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
-    assert(javaBatchInfo.batchTime === batchInfo.batchTime)
-    assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
-    batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
-      assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
-    }
-    assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
-    assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
-    assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
-    assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
-    assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
-    assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
-    assert(javaBatchInfo.numRecords === batchInfo.numRecords)
-    assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
-    batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
-      assertOutputOperationInfo(
-        javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
-    }
-  }
-
-  private def assertStreamingInfo(
-      javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
-    assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
-    assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
-    assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
-    assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
-  }
-
-  private def assertOutputOperationInfo(
-      javaOutputOperationInfo: JavaOutputOperationInfo,
-      outputOperationInfo: OutputOperationInfo): Unit = {
-    assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
-    assert(javaOutputOperationInfo.id === outputOperationInfo.id)
-    assert(javaOutputOperationInfo.name === outputOperationInfo.name)
-    assert(javaOutputOperationInfo.description === outputOperationInfo.description)
-    assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
-    assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
-    assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
-  }
-}
-
-class TestJavaStreamingListener extends JavaStreamingListener {
-
-  var streamingStarted: JavaStreamingListenerStreamingStarted = null
-  var receiverStarted: JavaStreamingListenerReceiverStarted = null
-  var receiverError: JavaStreamingListenerReceiverError = null
-  var receiverStopped: JavaStreamingListenerReceiverStopped = null
-  var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
-  var batchStarted: JavaStreamingListenerBatchStarted = null
-  var batchCompleted: JavaStreamingListenerBatchCompleted = null
-  var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
-  var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
-
-  override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
-    this.streamingStarted = streamingStarted
-  }
-
-  override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
-    this.receiverStarted = receiverStarted
-  }
-
-  override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
-    this.receiverError = receiverError
-  }
-
-  override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
-    this.receiverStopped = receiverStopped
-  }
-
-  override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
-    this.batchSubmitted = batchSubmitted
-  }
-
-  override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
-    this.batchStarted = batchStarted
-  }
-
-  override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
-    this.batchCompleted = batchCompleted
-  }
-
-  override def onOutputOperationStarted(
-      outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
-    this.outputOperationStarted = outputOperationStarted
-  }
-
-  override def onOutputOperationCompleted(
-      outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
-    this.outputOperationCompleted = outputOperationCompleted
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
new file mode 100644
index 0000000..0c4a64c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaDStreamLike, JavaStreamingContext}
+
+/** Exposes streaming test functionality in a Java-friendly way. */
+trait JavaTestBase extends TestSuiteBase {
+
+  /**
+   * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
+   * The stream will be derived from the supplied lists of Java objects.
+   */
+  def attachTestInputStream[T](
+      ssc: JavaStreamingContext,
+      data: JList[JList[T]],
+      numPartitions: Int): JavaDStream[T] = {
+    val seqData = data.asScala.map(_.asScala)
+
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+    new JavaDStream[T](dstream)
+  }
+
+  /**
+   * Attach a provided stream to it's associated StreamingContext as a
+   * [[org.apache.spark.streaming.TestOutputStream]].
+   */
+  def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
+      dstream: JavaDStreamLike[T, This, R]): Unit = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
+    ostream.register()
+  }
+
+  /**
+   * Process all registered streams for a numBatches batches, failing if
+   * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+   * and returned, represented as a list for each batch interval.
+   *
+   * Returns a list of items for each RDD.
+   */
+  def runStreams[V](
+      ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+    implicit val cm: ClassTag[V] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    ssc.getState()
+    val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+    res.map(_.asJava).toSeq.asJava
+  }
+
+  /**
+   * Process all registered streams for a numBatches batches, failing if
+   * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+   * and returned, represented as a list for each batch interval.
+   *
+   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+   * representing one partition.
+   */
+  def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
+      numExpectedOutput: Int): JList[JList[JList[V]]] = {
+    implicit val cm: ClassTag[V] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
+    res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
+  }
+}
+
+object JavaTestUtils extends JavaTestBase {
+  override def maxWaitTimeMillis: Int = 20000
+
+}
+
+object JavaCheckpointTestUtils extends JavaTestBase {
+  override def actuallyWait: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
new file mode 100644
index 0000000..cfd4323
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler._
+
+class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
+
+  test("basic") {
+    val listener = new TestJavaStreamingListener()
+    val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+
+    val streamingStarted = StreamingListenerStreamingStarted(1000L)
+    listenerWrapper.onStreamingStarted(streamingStarted)
+    assert(listener.streamingStarted.time === streamingStarted.time)
+
+    val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = true,
+      location = "localhost",
+      executorId = "1"
+    ))
+    listenerWrapper.onReceiverStarted(receiverStarted)
+    assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
+
+    val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = false,
+      location = "localhost",
+      executorId = "1"
+    ))
+    listenerWrapper.onReceiverStopped(receiverStopped)
+    assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
+
+    val receiverError = StreamingListenerReceiverError(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = false,
+      location = "localhost",
+      executorId = "1",
+      lastErrorMessage = "failed",
+      lastError = "failed",
+      lastErrorTime = System.currentTimeMillis()
+    ))
+    listenerWrapper.onReceiverError(receiverError)
+    assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
+
+    val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+      submissionTime = 1001L,
+      None,
+      None,
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = None,
+          endTime = None,
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = None,
+          endTime = None,
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchSubmitted(batchSubmitted)
+    assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
+
+    val batchStarted = StreamingListenerBatchStarted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+      submissionTime = 1001L,
+      Some(1002L),
+      None,
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = Some(1003L),
+          endTime = None,
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = Some(1005L),
+          endTime = None,
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchStarted(batchStarted)
+    assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
+
+    val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+      submissionTime = 1001L,
+      Some(1002L),
+      Some(1010L),
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = Some(1003L),
+          endTime = Some(1004L),
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = Some(1005L),
+          endTime = Some(1010L),
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchCompleted(batchCompleted)
+    assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
+
+    val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
+      batchTime = Time(1000L),
+      id = 0,
+      name = "op1",
+      description = "operation1",
+      startTime = Some(1003L),
+      endTime = None,
+      failureReason = None
+    ))
+    listenerWrapper.onOutputOperationStarted(outputOperationStarted)
+    assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
+      outputOperationStarted.outputOperationInfo)
+
+    val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
+      batchTime = Time(1000L),
+      id = 0,
+      name = "op1",
+      description = "operation1",
+      startTime = Some(1003L),
+      endTime = Some(1004L),
+      failureReason = None
+    ))
+    listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
+    assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
+      outputOperationCompleted.outputOperationInfo)
+  }
+
+  private def assertReceiverInfo(
+      javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
+    assert(javaReceiverInfo.streamId === receiverInfo.streamId)
+    assert(javaReceiverInfo.name === receiverInfo.name)
+    assert(javaReceiverInfo.active === receiverInfo.active)
+    assert(javaReceiverInfo.location === receiverInfo.location)
+    assert(javaReceiverInfo.executorId === receiverInfo.executorId)
+    assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
+    assert(javaReceiverInfo.lastError === receiverInfo.lastError)
+    assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
+  }
+
+  private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
+    assert(javaBatchInfo.batchTime === batchInfo.batchTime)
+    assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
+    batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
+      assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
+    }
+    assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
+    assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
+    assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
+    assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
+    assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
+    assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
+    assert(javaBatchInfo.numRecords === batchInfo.numRecords)
+    assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
+    batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
+      assertOutputOperationInfo(
+        javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
+    }
+  }
+
+  private def assertStreamingInfo(
+      javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
+    assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
+    assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
+    assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
+    assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
+  }
+
+  private def assertOutputOperationInfo(
+      javaOutputOperationInfo: JavaOutputOperationInfo,
+      outputOperationInfo: OutputOperationInfo): Unit = {
+    assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
+    assert(javaOutputOperationInfo.id === outputOperationInfo.id)
+    assert(javaOutputOperationInfo.name === outputOperationInfo.name)
+    assert(javaOutputOperationInfo.description === outputOperationInfo.description)
+    assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
+    assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
+    assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
+  }
+}
+
+class TestJavaStreamingListener extends JavaStreamingListener {
+
+  var streamingStarted: JavaStreamingListenerStreamingStarted = null
+  var receiverStarted: JavaStreamingListenerReceiverStarted = null
+  var receiverError: JavaStreamingListenerReceiverError = null
+  var receiverStopped: JavaStreamingListenerReceiverStopped = null
+  var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
+  var batchStarted: JavaStreamingListenerBatchStarted = null
+  var batchCompleted: JavaStreamingListenerBatchCompleted = null
+  var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
+  var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+
+  override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+    this.streamingStarted = streamingStarted
+  }
+
+  override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+    this.receiverStarted = receiverStarted
+  }
+
+  override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+    this.receiverError = receiverError
+  }
+
+  override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+    this.receiverStopped = receiverStopped
+  }
+
+  override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+    this.batchSubmitted = batchSubmitted
+  }
+
+  override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+    this.batchStarted = batchStarted
+  }
+
+  override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+    this.batchCompleted = batchCompleted
+  }
+
+  override def onOutputOperationStarted(
+      outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+    this.outputOperationStarted = outputOperationStarted
+  }
+
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+    this.outputOperationCompleted = outputOperationCompleted
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org