You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/04 11:51:14 UTC
spark git commit: [SPARK-22423][SQL] Scala test source files like
TestHiveSingleton.scala should be in scala source root
Repository: spark
Updated Branches:
refs/heads/master e7adb7d7a -> 7a8412352
[SPARK-22423][SQL] Scala test source files like TestHiveSingleton.scala should be in scala source root
## What changes were proposed in this pull request?
Scala test source files like TestHiveSingleton.scala should be in scala source root
## How was this patch tested?
Just move scala file from java directory to scala directory
No new test case in this PR.
```
renamed: mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala -> mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
renamed: streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala -> streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
renamed: streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala -> streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
renamed: sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
```
Author: xubo245 <60...@qq.com>
Closes #19639 from xubo245/scalaDirectory.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a841235
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a841235
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a841235
Branch: refs/heads/master
Commit: 7a8412352e3aaf14527f97c82d0d62f9de39e753
Parents: e7adb7d
Author: xubo245 <60...@qq.com>
Authored: Sat Nov 4 11:51:10 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Nov 4 11:51:10 2017 +0000
----------------------------------------------------------------------
.../spark/ml/util/IdentifiableSuite.scala | 41 ---
.../spark/ml/util/IdentifiableSuite.scala | 41 +++
.../spark/sql/hive/test/TestHiveSingleton.scala | 42 ---
.../spark/sql/hive/test/TestHiveSingleton.scala | 42 +++
.../apache/spark/streaming/JavaTestUtils.scala | 99 ------
.../JavaStreamingListenerWrapperSuite.scala | 303 -------------------
.../apache/spark/streaming/JavaTestUtils.scala | 99 ++++++
.../JavaStreamingListenerWrapperSuite.scala | 303 +++++++++++++++++++
8 files changed, 485 insertions(+), 485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala b/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
deleted file mode 100644
index 878bc66..0000000
--- a/mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ml.util
-
-import org.apache.spark.SparkFunSuite
-
-class IdentifiableSuite extends SparkFunSuite {
-
- import IdentifiableSuite.Test
-
- test("Identifiable") {
- val test0 = new Test("test_0")
- assert(test0.uid === "test_0")
-
- val test1 = new Test
- assert(test1.uid.startsWith("test_"))
- }
-}
-
-object IdentifiableSuite {
-
- class Test(override val uid: String) extends Identifiable {
- def this() = this(Identifiable.randomUID("test"))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
new file mode 100644
index 0000000..878bc66
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.util
+
+import org.apache.spark.SparkFunSuite
+
+class IdentifiableSuite extends SparkFunSuite {
+
+ import IdentifiableSuite.Test
+
+ test("Identifiable") {
+ val test0 = new Test("test_0")
+ assert(test0.uid === "test_0")
+
+ val test1 = new Test
+ assert(test1.uid.startsWith("test_"))
+ }
+}
+
+object IdentifiableSuite {
+
+ class Test(override val uid: String) extends Identifiable {
+ def this() = this(Identifiable.randomUID("test"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
deleted file mode 100644
index df7988f..0000000
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.test
-
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.client.HiveClient
-
-
-trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
- protected val spark: SparkSession = TestHive.sparkSession
- protected val hiveContext: TestHiveContext = TestHive
- protected val hiveClient: HiveClient =
- spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-
- protected override def afterAll(): Unit = {
- try {
- hiveContext.reset()
- } finally {
- super.afterAll()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
new file mode 100644
index 0000000..df7988f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.test
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
+ protected val spark: SparkSession = TestHive.sparkSession
+ protected val hiveContext: TestHiveContext = TestHive
+ protected val hiveClient: HiveClient =
+ spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+ protected override def afterAll(): Unit = {
+ try {
+ hiveContext.reset()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
deleted file mode 100644
index 0c4a64c..0000000
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.api.java.JavaRDDLike
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaDStreamLike, JavaStreamingContext}
-
-/** Exposes streaming test functionality in a Java-friendly way. */
-trait JavaTestBase extends TestSuiteBase {
-
- /**
- * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
- * The stream will be derived from the supplied lists of Java objects.
- */
- def attachTestInputStream[T](
- ssc: JavaStreamingContext,
- data: JList[JList[T]],
- numPartitions: Int): JavaDStream[T] = {
- val seqData = data.asScala.map(_.asScala)
-
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
- new JavaDStream[T](dstream)
- }
-
- /**
- * Attach a provided stream to it's associated StreamingContext as a
- * [[org.apache.spark.streaming.TestOutputStream]].
- */
- def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
- dstream: JavaDStreamLike[T, This, R]): Unit = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
- ostream.register()
- }
-
- /**
- * Process all registered streams for a numBatches batches, failing if
- * numExpectedOutput RDD's are not generated. Generated RDD's are collected
- * and returned, represented as a list for each batch interval.
- *
- * Returns a list of items for each RDD.
- */
- def runStreams[V](
- ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
- implicit val cm: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
- ssc.getState()
- val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(_.asJava).toSeq.asJava
- }
-
- /**
- * Process all registered streams for a numBatches batches, failing if
- * numExpectedOutput RDD's are not generated. Generated RDD's are collected
- * and returned, represented as a list for each batch interval.
- *
- * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
- * representing one partition.
- */
- def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
- numExpectedOutput: Int): JList[JList[JList[V]]] = {
- implicit val cm: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
- val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
- }
-}
-
-object JavaTestUtils extends JavaTestBase {
- override def maxWaitTimeMillis: Int = 20000
-
-}
-
-object JavaCheckpointTestUtils extends JavaTestBase {
- override def actuallyWait: Boolean = true
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
deleted file mode 100644
index cfd4323..0000000
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.api.java
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler._
-
-class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
-
- test("basic") {
- val listener = new TestJavaStreamingListener()
- val listenerWrapper = new JavaStreamingListenerWrapper(listener)
-
- val streamingStarted = StreamingListenerStreamingStarted(1000L)
- listenerWrapper.onStreamingStarted(streamingStarted)
- assert(listener.streamingStarted.time === streamingStarted.time)
-
- val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
- streamId = 2,
- name = "test",
- active = true,
- location = "localhost",
- executorId = "1"
- ))
- listenerWrapper.onReceiverStarted(receiverStarted)
- assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
-
- val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
- streamId = 2,
- name = "test",
- active = false,
- location = "localhost",
- executorId = "1"
- ))
- listenerWrapper.onReceiverStopped(receiverStopped)
- assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
-
- val receiverError = StreamingListenerReceiverError(ReceiverInfo(
- streamId = 2,
- name = "test",
- active = false,
- location = "localhost",
- executorId = "1",
- lastErrorMessage = "failed",
- lastError = "failed",
- lastErrorTime = System.currentTimeMillis()
- ))
- listenerWrapper.onReceiverError(receiverError)
- assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
-
- val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
- batchTime = Time(1000L),
- streamIdToInputInfo = Map(
- 0 -> StreamInputInfo(
- inputStreamId = 0,
- numRecords = 1000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
- 1 -> StreamInputInfo(
- inputStreamId = 1,
- numRecords = 2000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
- submissionTime = 1001L,
- None,
- None,
- outputOperationInfos = Map(
- 0 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 0,
- name = "op1",
- description = "operation1",
- startTime = None,
- endTime = None,
- failureReason = None),
- 1 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 1,
- name = "op2",
- description = "operation2",
- startTime = None,
- endTime = None,
- failureReason = None))
- ))
- listenerWrapper.onBatchSubmitted(batchSubmitted)
- assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
-
- val batchStarted = StreamingListenerBatchStarted(BatchInfo(
- batchTime = Time(1000L),
- streamIdToInputInfo = Map(
- 0 -> StreamInputInfo(
- inputStreamId = 0,
- numRecords = 1000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
- 1 -> StreamInputInfo(
- inputStreamId = 1,
- numRecords = 2000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
- submissionTime = 1001L,
- Some(1002L),
- None,
- outputOperationInfos = Map(
- 0 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 0,
- name = "op1",
- description = "operation1",
- startTime = Some(1003L),
- endTime = None,
- failureReason = None),
- 1 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 1,
- name = "op2",
- description = "operation2",
- startTime = Some(1005L),
- endTime = None,
- failureReason = None))
- ))
- listenerWrapper.onBatchStarted(batchStarted)
- assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
-
- val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
- batchTime = Time(1000L),
- streamIdToInputInfo = Map(
- 0 -> StreamInputInfo(
- inputStreamId = 0,
- numRecords = 1000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
- 1 -> StreamInputInfo(
- inputStreamId = 1,
- numRecords = 2000,
- metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
- submissionTime = 1001L,
- Some(1002L),
- Some(1010L),
- outputOperationInfos = Map(
- 0 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 0,
- name = "op1",
- description = "operation1",
- startTime = Some(1003L),
- endTime = Some(1004L),
- failureReason = None),
- 1 -> OutputOperationInfo(
- batchTime = Time(1000L),
- id = 1,
- name = "op2",
- description = "operation2",
- startTime = Some(1005L),
- endTime = Some(1010L),
- failureReason = None))
- ))
- listenerWrapper.onBatchCompleted(batchCompleted)
- assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
-
- val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
- batchTime = Time(1000L),
- id = 0,
- name = "op1",
- description = "operation1",
- startTime = Some(1003L),
- endTime = None,
- failureReason = None
- ))
- listenerWrapper.onOutputOperationStarted(outputOperationStarted)
- assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
- outputOperationStarted.outputOperationInfo)
-
- val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
- batchTime = Time(1000L),
- id = 0,
- name = "op1",
- description = "operation1",
- startTime = Some(1003L),
- endTime = Some(1004L),
- failureReason = None
- ))
- listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
- assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
- outputOperationCompleted.outputOperationInfo)
- }
-
- private def assertReceiverInfo(
- javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
- assert(javaReceiverInfo.streamId === receiverInfo.streamId)
- assert(javaReceiverInfo.name === receiverInfo.name)
- assert(javaReceiverInfo.active === receiverInfo.active)
- assert(javaReceiverInfo.location === receiverInfo.location)
- assert(javaReceiverInfo.executorId === receiverInfo.executorId)
- assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
- assert(javaReceiverInfo.lastError === receiverInfo.lastError)
- assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
- }
-
- private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
- assert(javaBatchInfo.batchTime === batchInfo.batchTime)
- assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
- batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
- assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
- }
- assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
- assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
- assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
- assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
- assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
- assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
- assert(javaBatchInfo.numRecords === batchInfo.numRecords)
- assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
- batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
- assertOutputOperationInfo(
- javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
- }
- }
-
- private def assertStreamingInfo(
- javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
- assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
- assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
- assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
- assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
- }
-
- private def assertOutputOperationInfo(
- javaOutputOperationInfo: JavaOutputOperationInfo,
- outputOperationInfo: OutputOperationInfo): Unit = {
- assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
- assert(javaOutputOperationInfo.id === outputOperationInfo.id)
- assert(javaOutputOperationInfo.name === outputOperationInfo.name)
- assert(javaOutputOperationInfo.description === outputOperationInfo.description)
- assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
- assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
- assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
- }
-}
-
-class TestJavaStreamingListener extends JavaStreamingListener {
-
- var streamingStarted: JavaStreamingListenerStreamingStarted = null
- var receiverStarted: JavaStreamingListenerReceiverStarted = null
- var receiverError: JavaStreamingListenerReceiverError = null
- var receiverStopped: JavaStreamingListenerReceiverStopped = null
- var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
- var batchStarted: JavaStreamingListenerBatchStarted = null
- var batchCompleted: JavaStreamingListenerBatchCompleted = null
- var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
- var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
-
- override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
- this.streamingStarted = streamingStarted
- }
-
- override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
- this.receiverStarted = receiverStarted
- }
-
- override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
- this.receiverError = receiverError
- }
-
- override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
- this.receiverStopped = receiverStopped
- }
-
- override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
- this.batchSubmitted = batchSubmitted
- }
-
- override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
- this.batchStarted = batchStarted
- }
-
- override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
- this.batchCompleted = batchCompleted
- }
-
- override def onOutputOperationStarted(
- outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
- this.outputOperationStarted = outputOperationStarted
- }
-
- override def onOutputOperationCompleted(
- outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
- this.outputOperationCompleted = outputOperationCompleted
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
new file mode 100644
index 0000000..0c4a64c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaDStreamLike, JavaStreamingContext}
+
+/** Exposes streaming test functionality in a Java-friendly way. */
+trait JavaTestBase extends TestSuiteBase {
+
+ /**
+ * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
+ * The stream will be derived from the supplied lists of Java objects.
+ */
+ def attachTestInputStream[T](
+ ssc: JavaStreamingContext,
+ data: JList[JList[T]],
+ numPartitions: Int): JavaDStream[T] = {
+ val seqData = data.asScala.map(_.asScala)
+
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+ new JavaDStream[T](dstream)
+ }
+
+ /**
+ * Attach a provided stream to it's associated StreamingContext as a
+ * [[org.apache.spark.streaming.TestOutputStream]].
+ */
+ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
+ dstream: JavaDStreamLike[T, This, R]): Unit = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
+ ostream.register()
+ }
+
+ /**
+ * Process all registered streams for a numBatches batches, failing if
+ * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+ * and returned, represented as a list for each batch interval.
+ *
+ * Returns a list of items for each RDD.
+ */
+ def runStreams[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ ssc.getState()
+ val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+ res.map(_.asJava).toSeq.asJava
+ }
+
+ /**
+ * Process all registered streams for a numBatches batches, failing if
+ * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+ * and returned, represented as a list for each batch interval.
+ *
+ * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+ * representing one partition.
+ */
+ def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
+ numExpectedOutput: Int): JList[JList[JList[V]]] = {
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
+ res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
+ }
+}
+
+object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis: Int = 20000
+
+}
+
+object JavaCheckpointTestUtils extends JavaTestBase {
+ override def actuallyWait: Boolean = true
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a841235/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
new file mode 100644
index 0000000..cfd4323
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler._
+
+class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
+
+ test("basic") {
+ val listener = new TestJavaStreamingListener()
+ val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+
+ val streamingStarted = StreamingListenerStreamingStarted(1000L)
+ listenerWrapper.onStreamingStarted(streamingStarted)
+ assert(listener.streamingStarted.time === streamingStarted.time)
+
+ val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = true,
+ location = "localhost",
+ executorId = "1"
+ ))
+ listenerWrapper.onReceiverStarted(receiverStarted)
+ assertReceiverInfo(listener.receiverStarted.receiverInfo, receiverStarted.receiverInfo)
+
+ val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = false,
+ location = "localhost",
+ executorId = "1"
+ ))
+ listenerWrapper.onReceiverStopped(receiverStopped)
+ assertReceiverInfo(listener.receiverStopped.receiverInfo, receiverStopped.receiverInfo)
+
+ val receiverError = StreamingListenerReceiverError(ReceiverInfo(
+ streamId = 2,
+ name = "test",
+ active = false,
+ location = "localhost",
+ executorId = "1",
+ lastErrorMessage = "failed",
+ lastError = "failed",
+ lastErrorTime = System.currentTimeMillis()
+ ))
+ listenerWrapper.onReceiverError(receiverError)
+ assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)
+
+ val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ None,
+ None,
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = None,
+ endTime = None,
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = None,
+ endTime = None,
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchSubmitted(batchSubmitted)
+ assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)
+
+ val batchStarted = StreamingListenerBatchStarted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ Some(1002L),
+ None,
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = None,
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = Some(1005L),
+ endTime = None,
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchStarted(batchStarted)
+ assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
+
+ val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
+ batchTime = Time(1000L),
+ streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(
+ inputStreamId = 0,
+ numRecords = 1000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver1")),
+ 1 -> StreamInputInfo(
+ inputStreamId = 1,
+ numRecords = 2000,
+ metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "receiver2"))),
+ submissionTime = 1001L,
+ Some(1002L),
+ Some(1010L),
+ outputOperationInfos = Map(
+ 0 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = Some(1004L),
+ failureReason = None),
+ 1 -> OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 1,
+ name = "op2",
+ description = "operation2",
+ startTime = Some(1005L),
+ endTime = Some(1010L),
+ failureReason = None))
+ ))
+ listenerWrapper.onBatchCompleted(batchCompleted)
+ assertBatchInfo(listener.batchCompleted.batchInfo, batchCompleted.batchInfo)
+
+ val outputOperationStarted = StreamingListenerOutputOperationStarted(OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = None,
+ failureReason = None
+ ))
+ listenerWrapper.onOutputOperationStarted(outputOperationStarted)
+ assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
+ outputOperationStarted.outputOperationInfo)
+
+ val outputOperationCompleted = StreamingListenerOutputOperationCompleted(OutputOperationInfo(
+ batchTime = Time(1000L),
+ id = 0,
+ name = "op1",
+ description = "operation1",
+ startTime = Some(1003L),
+ endTime = Some(1004L),
+ failureReason = None
+ ))
+ listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
+ assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
+ outputOperationCompleted.outputOperationInfo)
+ }
+
+ private def assertReceiverInfo(
+ javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
+ assert(javaReceiverInfo.streamId === receiverInfo.streamId)
+ assert(javaReceiverInfo.name === receiverInfo.name)
+ assert(javaReceiverInfo.active === receiverInfo.active)
+ assert(javaReceiverInfo.location === receiverInfo.location)
+ assert(javaReceiverInfo.executorId === receiverInfo.executorId)
+ assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
+ assert(javaReceiverInfo.lastError === receiverInfo.lastError)
+ assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
+ }
+
+ private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: BatchInfo): Unit = {
+ assert(javaBatchInfo.batchTime === batchInfo.batchTime)
+ assert(javaBatchInfo.streamIdToInputInfo.size === batchInfo.streamIdToInputInfo.size)
+ batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
+ assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), streamInputInfo)
+ }
+ assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
+ assert(javaBatchInfo.processingStartTime === batchInfo.processingStartTime.getOrElse(-1))
+ assert(javaBatchInfo.processingEndTime === batchInfo.processingEndTime.getOrElse(-1))
+ assert(javaBatchInfo.schedulingDelay === batchInfo.schedulingDelay.getOrElse(-1))
+ assert(javaBatchInfo.processingDelay === batchInfo.processingDelay.getOrElse(-1))
+ assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
+ assert(javaBatchInfo.numRecords === batchInfo.numRecords)
+ assert(javaBatchInfo.outputOperationInfos.size === batchInfo.outputOperationInfos.size)
+ batchInfo.outputOperationInfos.foreach { case (outputOperationId, outputOperationInfo) =>
+ assertOutputOperationInfo(
+ javaBatchInfo.outputOperationInfos.get(outputOperationId), outputOperationInfo)
+ }
+ }
+
+ private def assertStreamingInfo(
+ javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: StreamInputInfo): Unit = {
+ assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
+ assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
+ assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
+ assert(javaStreamInputInfo.metadataDescription === streamInputInfo.metadataDescription.orNull)
+ }
+
+ private def assertOutputOperationInfo(
+ javaOutputOperationInfo: JavaOutputOperationInfo,
+ outputOperationInfo: OutputOperationInfo): Unit = {
+ assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
+ assert(javaOutputOperationInfo.id === outputOperationInfo.id)
+ assert(javaOutputOperationInfo.name === outputOperationInfo.name)
+ assert(javaOutputOperationInfo.description === outputOperationInfo.description)
+ assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
+ assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
+ assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
+ }
+}
+
+class TestJavaStreamingListener extends JavaStreamingListener {
+
+ var streamingStarted: JavaStreamingListenerStreamingStarted = null
+ var receiverStarted: JavaStreamingListenerReceiverStarted = null
+ var receiverError: JavaStreamingListenerReceiverError = null
+ var receiverStopped: JavaStreamingListenerReceiverStopped = null
+ var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
+ var batchStarted: JavaStreamingListenerBatchStarted = null
+ var batchCompleted: JavaStreamingListenerBatchCompleted = null
+ var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
+ var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ this.streamingStarted = streamingStarted
+ }
+
+ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+ this.receiverStarted = receiverStarted
+ }
+
+ override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+ this.receiverError = receiverError
+ }
+
+ override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+ this.receiverStopped = receiverStopped
+ }
+
+ override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+ this.batchSubmitted = batchSubmitted
+ }
+
+ override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+ this.batchStarted = batchStarted
+ }
+
+ override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+ this.batchCompleted = batchCompleted
+ }
+
+ override def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+ this.outputOperationStarted = outputOperationStarted
+ }
+
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+ this.outputOperationCompleted = outputOperationCompleted
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org