You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/10/10 09:47:08 UTC

git commit: [SPARK-2805] Upgrade Akka to 2.3.4

Repository: spark
Updated Branches:
  refs/heads/master 6f98902a3 -> 411cf29ff


[SPARK-2805] Upgrade Akka to 2.3.4

This is a second rev of the Akka upgrade (earlier merged, but reverted). I made a slight modification which is that I also upgrade Hive to deal with a compatibility issue related to the protocol buffers library.

Author: Anand Avati <av...@redhat.com>
Author: Patrick Wendell <pw...@gmail.com>

Closes #2752 from pwendell/akka-upgrade and squashes the following commits:

4c7ca3f [Patrick Wendell] Upgrading to new hive->protobuf version
57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO
2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/411cf29f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/411cf29f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/411cf29f

Branch: refs/heads/master
Commit: 411cf29fff011561f0093bb6101af87842828369
Parents: 6f98902
Author: Anand Avati <av...@redhat.com>
Authored: Fri Oct 10 00:46:56 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Oct 10 00:46:56 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../apache/spark/deploy/client/AppClient.scala  |  2 +-
 .../spark/deploy/worker/WorkerWatcher.scala     |  2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  4 +-
 pom.xml                                         |  4 +-
 .../spark/streaming/InputStreamsSuite.scala     | 71 --------------------
 6 files changed, 7 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 065ddda..f2687ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
       System.exit(-1)
 
-    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
+    case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
       println(s"Cause was: $cause")
       System.exit(-1)

http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 3279005..98a93d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -154,7 +154,7 @@ private[spark] class AppClient(
         logWarning(s"Connection to $address failed; waiting for master to reconnect...")
         markDisconnected()
 
-      case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+      case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
         logWarning(s"Could not connect to $address: $cause")
 
       case StopAppClient =>

http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 6d0d0bb..63a8ac8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
     case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
       logInfo(s"Successfully connected to $workerUrl")
 
-    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
+    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
         if isWorker(remoteAddress) =>
       // These logs may not be seen if the worker (and associated pipe) has died
       logError(s"Could not initialize connection to worker $workerUrl. Exiting.")

http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 1fef79a..cbc0bd1 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
+      Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~123B, and no exception should be thrown
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
+      Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.

http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7756c89..d047b9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
     <mesos.version>0.18.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <akka.group>org.spark-project.akka</akka.group>
-    <akka.version>2.2.3-shaded-protobuf</akka.version>
+    <akka.version>2.3.4-spark</akka.version>
     <slf4j.version>1.7.5</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>1.0.4</hadoop.version>
@@ -127,7 +127,7 @@
     <hbase.version>0.94.6</hbase.version>
     <flume.version>1.4.0</flume.version>
     <zookeeper.version>3.4.5</zookeeper.version>
-    <hive.version>0.12.0</hive.version>
+    <hive.version>0.12.0-protobuf</hive.version>
     <parquet.version>1.4.3</parquet.version>
     <jblas.version>1.2.3</jblas.version>
     <jetty.version>8.1.14.v20131031</jetty.version>

http://git-wip-us.apache.org/repos/asf/spark/blob/411cf29f/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a44a45a..fa04fa3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.streaming
 
 import akka.actor.Actor
-import akka.actor.IO
-import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
@@ -143,59 +141,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
   }
 
-  // TODO: This test works in IntelliJ but not through SBT
-  ignore("actor input stream") {
-    // Start the server
-    val testServer = new TestServer()
-    val port = testServer.port
-    testServer.start()
-
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
-      // Had to pass the local value of port to prevent from closing over entire scope
-      StorageLevel.MEMORY_AND_DISK)
-    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
-    val outputStream = new TestOutputStream(networkStream, outputBuffer)
-    def output = outputBuffer.flatMap(x => x)
-    outputStream.register()
-    ssc.start()
-
-    // Feed data to the server to send to the network receiver
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    val input = 1 to 9
-    val expectedOutput = input.map(x => x.toString)
-    Thread.sleep(1000)
-    for (i <- 0 until input.size) {
-      testServer.send(input(i).toString)
-      Thread.sleep(500)
-      clock.addToTime(batchDuration.milliseconds)
-    }
-    Thread.sleep(1000)
-    logInfo("Stopping server")
-    testServer.stop()
-    logInfo("Stopping context")
-    ssc.stop()
-
-    // Verify whether data received was as expected
-    logInfo("--------------------------------")
-    logInfo("output.size = " + outputBuffer.size)
-    logInfo("output")
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("expected output.size = " + expectedOutput.size)
-    logInfo("expected output")
-    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("--------------------------------")
-
-    // Verify whether all the elements received are as expected
-    // (whether the elements were received one in each interval is not verified)
-    assert(output.size === expectedOutput.size)
-    for (i <- 0 until output.size) {
-      assert(output(i) === expectedOutput(i))
-    }
-  }
-
-
   test("multi-thread receiver") {
     // set up the test receiver
     val numThreads = 10
@@ -377,22 +322,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
   def port = serverSocket.getLocalPort
 }
 
-/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with ActorHelper {
-
-  def bytesToString(byteString: ByteString) = byteString.utf8String
-
-  override def preStart(): Unit = {
-    @deprecated("suppress compile time deprecation warning", "1.0.0")
-    val unit = IOManager(context.system).connect(new InetSocketAddress(port))
-  }
-
-  def receive = {
-    case IO.Read(socket, bytes) =>
-      store(bytesToString(bytes))
-  }
-}
-
 /** This is a receiver to test multiple threads inserting data using block generator */
 class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org