You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/19 23:18:15 UTC

spark git commit: SPARK-3962 Marked scope as provided for external projects.

Repository: spark
Updated Branches:
  refs/heads/master 0df02ca46 -> 1c938413b


SPARK-3962 Marked scope as provided for external projects.

Somehow maven shade plugin is set in infinite loop of creating effective pom.

Author: Prashant Sharma <pr...@imaginea.com>
Author: Prashant Sharma <sc...@gmail.com>

Closes #2959 from ScrapCodes/SPARK-3962/scope-provided and squashes the following commits:

994d1d3 [Prashant Sharma] Fixed failing flume tests
270b4fb [Prashant Sharma] Removed most of the unused code.
bb3bbfd [Prashant Sharma] SPARK-3962 Marked scope as provided for external.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c938413
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c938413
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c938413

Branch: refs/heads/master
Commit: 1c938413ba5579034675f1b4ea3b8fd0e47dd8d6
Parents: 0df02ca
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Wed Nov 19 14:18:10 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Nov 19 14:18:10 2014 -0800

----------------------------------------------------------------------
 external/flume/pom.xml                          |  8 +---
 .../streaming/LocalJavaStreamingContext.java    | 40 ++++++++++++++++
 .../spark/streaming/TestOutputStream.scala      | 48 ++++++++++++++++++++
 .../flume/FlumePollingStreamSuite.scala         | 23 ++++++++--
 external/kafka/pom.xml                          |  8 +---
 external/mqtt/pom.xml                           |  8 +---
 .../streaming/LocalJavaStreamingContext.java    | 40 ++++++++++++++++
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 12 ++++-
 external/twitter/pom.xml                        |  8 +---
 .../streaming/LocalJavaStreamingContext.java    | 40 ++++++++++++++++
 .../streaming/twitter/TwitterStreamSuite.scala  | 18 ++++++--
 external/zeromq/pom.xml                         |  8 +---
 .../streaming/LocalJavaStreamingContext.java    | 40 ++++++++++++++++
 .../streaming/zeromq/ZeroMQStreamSuite.scala    | 11 ++++-
 14 files changed, 264 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index fae84a3..a682f0e 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -39,6 +39,7 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -46,13 +47,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-sdk</artifactId>
       <version>${flume.version}</version>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..6e1f019
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
new file mode 100644
index 0000000..1a90000
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.{IOException, ObjectInputStream}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
+ */
+class TestOutputStream[T: ClassTag](parent: DStream[T],
+    val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+    val collected = rdd.collect()
+    output += collected
+  }) {
+
+  // This is to clear the output buffer every it is read from a checkpoint
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
+    ois.defaultReadObject()
+    output.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 475026e..b57a1c7 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,9 +20,6 @@ package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
 import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
-import java.util.Random
-
-import org.apache.spark.TestUtils
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -32,20 +29,35 @@ import org.apache.flume.channel.MemoryChannel
 import org.apache.flume.conf.Configurables
 import org.apache.flume.event.EventBuilder
 
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
 import org.apache.spark.streaming.flume.sink._
 import org.apache.spark.util.Utils
 
-class FlumePollingStreamSuite extends TestSuiteBase {
+class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
 
   val batchCount = 5
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
   val channelCapacity = 5000
   val maxAttempts = 5
+  val batchDuration = Seconds(1)
+
+  val conf = new SparkConf()
+    .setMaster("local[2]")
+    .setAppName(this.getClass.getSimpleName)
+
+  def beforeFunction() {
+    logInfo("Using manual clock")
+    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  }
+
+  before(beforeFunction())
 
   test("flume polling test") {
     testMultipleTimes(testFlumePolling)
@@ -229,4 +241,5 @@ class FlumePollingStreamSuite extends TestSuiteBase {
       null
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index ce74b0b..b3f4447 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -39,13 +39,7 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 8c5bd79..7038067 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -39,13 +39,7 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.paho</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..6e1f019
--- /dev/null
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 467fd26..84595ac 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -17,11 +17,19 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.scalatest.FunSuite
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
-class MQTTStreamSuite extends TestSuiteBase {
+class MQTTStreamSuite extends FunSuite {
+
+  val batchDuration = Seconds(1)
+
+  private val master: String = "local[2]"
+
+  private val framework: String = this.getClass.getSimpleName
 
   test("mqtt input stream") {
     val ssc = new StreamingContext(master, framework, batchDuration)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 399c30a..000ace1 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -39,13 +39,7 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.twitter4j</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..6e1f019
--- /dev/null
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 93741e0..9ee57d7 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -17,13 +17,23 @@
 
 package org.apache.spark.streaming.twitter
 
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
-import org.apache.spark.storage.StorageLevel
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import twitter4j.Status
 import twitter4j.auth.{NullAuthorization, Authorization}
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import twitter4j.Status
 
-class TwitterStreamSuite extends TestSuiteBase {
+class TwitterStreamSuite extends FunSuite with BeforeAndAfter with Logging {
+
+  val batchDuration = Seconds(1)
+
+  private val master: String = "local[2]"
+
+  private val framework: String = this.getClass.getSimpleName
 
   test("twitter input stream") {
     val ssc = new StreamingContext(master, framework, batchDuration)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index e453916..29c4520 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -39,13 +39,7 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>${akka.group}</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..6e1f019
--- /dev/null
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c938413/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index cc10ff6..a7566e7 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -20,12 +20,19 @@ package org.apache.spark.streaming.zeromq
 import akka.actor.SupervisorStrategy
 import akka.util.ByteString
 import akka.zeromq.Subscribe
+import org.scalatest.FunSuite
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
-class ZeroMQStreamSuite extends TestSuiteBase {
+class ZeroMQStreamSuite extends FunSuite {
+
+  val batchDuration = Seconds(1)
+
+  private val master: String = "local[2]"
+
+  private val framework: String = this.getClass.getSimpleName
 
   test("zeromq input stream") {
     val ssc = new StreamingContext(master, framework, batchDuration)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org