You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 06:31:35 UTC

[5/6] git commit: Address code review concerns and comments.

Address code review concerns and comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a8abfb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a8abfb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a8abfb7

Branch: refs/heads/master
Commit: 5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d
Parents: f1c5eca
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 19:15:09 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 19:15:09 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala   | 2 +-
 .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 3 ++-
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala    | 6 +++---
 .../scala/org/apache/spark/storage/BlockManagerWorker.scala    | 6 +++---
 .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala   | 3 ++-
 .../test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala | 5 +----
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 6 +++---
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 6 +++---
 8 files changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index bba873a..4e63117 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -189,7 +189,7 @@ object SparkHadoopWriter {
     if (path == null) {
       throw new IllegalArgumentException("Output path is null")
     }
-    var outputPath = new Path(path)
+    val outputPath = new Path(path)
     val fs = outputPath.getFileSystem(conf)
     if (outputPath == null || fs == null) {
       throw new IllegalArgumentException("Incorrectly formatted output path")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8830de7..82527fe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
-    new Iterator[Array[Byte]] {
+    val stdoutIterator = new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
         if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
       def hasNext = _nextObj.length != 0
     }
+    stdoutIterator
   }
 
   val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index a5394a2..cefcc3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
 
     val prefPartActual = prefPart.get
 
-    if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
+    if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
       minPowerOfTwo  // prefer balance over locality
-    else {
+    } else {
       prefPartActual // prefer locality over balance
     }
   }
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
    */
   def run(): Array[PartitionGroup] = {
     setupGroups(math.min(prev.partitions.length, maxPartitions))   // setup the groups (bins)
-    throwBalls()             // assign partitions (balls) to each group (bins)
+    throwBalls() // assign partitions (balls) to each group (bins)
     getPartitions
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index a36abe0..42f52d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
           Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
           case e: Exception => logError("Exception handling buffer message", e)
-          return None
+          None
         }
       }
       case otherMessage: Any => {
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
     val blockMessageArray = new BlockMessageArray(blockMessage)
     val resultMessage = connectionManager.sendMessageReliablySync(
         toConnManagerId, blockMessageArray.toBufferMessage)
-    return (resultMessage != None)
+    resultMessage != None
   }
 
   def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
             return blockMessage.getData
           })
       }
-      case None => logDebug("No response message received"); return null
+      case None => logDebug("No response message received")
     }
     null
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 14f89d5..f0236ef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         locations: Seq[Seq[String]] = Nil
       ): MyRDD = {
     val maxPartition = numPartitions - 1
-    new MyRDD(sc, dependencies) {
+    val newRDD = new MyRDD(sc, dependencies) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         throw new RuntimeException("should not be reached")
       override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
           Nil
       override def toString: String = "DAGSchedulerSuiteRDD " + id
     }
+    newRDD
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 3880e68..2910291 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
     }
     type MyRDD = RDD[(Int, Int)]
-    def makeRdd(
-        numPartitions: Int,
-        dependencies: List[Dependency[_]]
-      ): MyRDD = {
+    def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
       val maxPartition = numPartitions - 1
       new MyRDD(sc, dependencies) {
         override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e1fe09e..e56bc02 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c084485..51d9adb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */