You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/12/25 04:49:49 UTC

spark git commit: [SPARK-4873][Streaming] Use `Future.zip` instead of `Future.flatMap`(for-loop) in WriteAheadLogBasedBlockHandler

Repository: spark
Updated Branches:
  refs/heads/master 29fabb1b5 -> b4d0db80a


[SPARK-4873][Streaming] Use `Future.zip` instead of `Future.flatMap`(for-loop) in WriteAheadLogBasedBlockHandler

Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one.

Author: zsxwing <zs...@gmail.com>

Closes #3721 from zsxwing/SPARK-4873 and squashes the following commits:

46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4d0db80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4d0db80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4d0db80

Branch: refs/heads/master
Commit: b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557
Parents: 29fabb1
Author: zsxwing <zs...@gmail.com>
Authored: Wed Dec 24 19:49:41 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 24 19:49:41 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/receiver/ReceivedBlockHandler.scala  | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4d0db80/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index c0670e2..8b97db8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -187,10 +187,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     }
 
     // Combine the futures, wait for both to complete, and return the write ahead log segment
-    val combinedFuture = for {
-      _ <- storeInBlockManagerFuture
-      fileSegment <- storeInWriteAheadLogFuture
-    } yield fileSegment
+    val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
     val segment = Await.result(combinedFuture, blockStoreTimeout)
     WriteAheadLogBasedStoreResult(blockId, segment)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org