You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/19 19:51:57 UTC

spark git commit: Add example that reads a local file, writes to a DFS path provided by th...

Repository: spark
Updated Branches:
  refs/heads/master 0c32fc125 -> a9858036b


Add example that reads a local file, writes to a DFS path provided by th...

...e user, reads the file back from the DFS, and compares word counts on the local and DFS versions. Useful for verifying DFS correctness.

Author: RJ Nowling <rn...@gmail.com>

Closes #3347 from rnowling/dfs_read_write_test and squashes the following commits:

af8ccb7 [RJ Nowling] Don't use java.io.File since DFS may not be POSIX-compatible
b0ef9ea [RJ Nowling] Fix string style
07c6132 [RJ Nowling] Fix string style
7d9a8df [RJ Nowling] Fix string style
f74c160 [RJ Nowling] Fix else statement style
b9edf12 [RJ Nowling] Fix spark wc style
44415b9 [RJ Nowling] Fix local wc style
94a4691 [RJ Nowling] Fix space
df59b65 [RJ Nowling] Fix if statements
1b314f0 [RJ Nowling] Add scaladoc
a931d70 [RJ Nowling] Fix import order
0c89558 [RJ Nowling] Add example that reads a local file, writes to a DFS path provided by the user, reads the file back from the DFS, and compares word counts on the local and DFS versions. Useful for verifying DFS correctness.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9858036
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9858036
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9858036

Branch: refs/heads/master
Commit: a9858036bfd339b47dd6d2ed69ccbb61269c225e
Parents: 0c32fc1
Author: RJ Nowling <rn...@gmail.com>
Authored: Fri Jun 19 10:50:44 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jun 19 10:51:37 2015 -0700

----------------------------------------------------------------------
 .../spark/examples/DFSReadWriteTest.scala       | 138 +++++++++++++++++++
 1 file changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9858036/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
new file mode 100644
index 0000000..c05890d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.io.File
+
+import scala.io.Source._
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.SparkContext._
+
+/**
+  * Simple test for reading and writing to a distributed
+  * file system.  This example does the following:
+  * 
+  *   1. Reads local file
+  *   2. Computes word count on local file
+  *   3. Writes local file to a DFS
+  *   4. Reads the file back from the DFS
+  *   5. Computes word count on the file using Spark
+  *   6. Compares the word count results
+  */
+object DFSReadWriteTest {
+  
+  private var localFilePath: File = new File(".")
+  private var dfsDirPath: String = ""
+
+  private val NPARAMS = 2
+
+  private def readFile(filename: String): List[String] = {
+    val lineIter: Iterator[String] = fromFile(filename).getLines()
+    val lineList: List[String] = lineIter.toList
+    lineList
+  }
+
+  private def printUsage(): Unit = {
+    val usage: String = "DFS Read-Write Test\n" +
+    "\n" +
+    "Usage: localFile dfsDir\n" +
+    "\n" +
+    "localFile - (string) local file to use in test\n" +
+    "dfsDir - (string) DFS directory for read/write tests\n"
+
+    println(usage)
+  }
+
+  private def parseArgs(args: Array[String]): Unit = {
+    if (args.length != NPARAMS) {
+      printUsage()
+      System.exit(1)
+    }
+
+    var i = 0
+
+    localFilePath = new File(args(i))
+    if (!localFilePath.exists) {
+      System.err.println("Given path (" + args(i) + ") does not exist.\n")
+      printUsage()
+      System.exit(1)
+    }
+
+    if (!localFilePath.isFile) {
+      System.err.println("Given path (" + args(i) + ") is not a file.\n")
+      printUsage()
+      System.exit(1)
+    }
+
+    i += 1
+    dfsDirPath = args(i)
+  }
+
+  def runLocalWordCount(fileContents: List[String]): Int = {
+    fileContents.flatMap(_.split(" "))
+      .flatMap(_.split("\t"))
+      .filter(_.size > 0)
+      .groupBy(w => w)
+      .mapValues(_.size)
+      .values
+      .sum
+  }
+
+  def main(args: Array[String]): Unit = {
+    parseArgs(args)
+
+    println("Performing local word count")
+    val fileContents = readFile(localFilePath.toString())
+    val localWordCount = runLocalWordCount(fileContents)
+
+    println("Creating SparkConf")
+    val conf = new SparkConf().setAppName("DFS Read Write Test")
+
+    println("Creating SparkContext")
+    val sc = new SparkContext(conf)
+
+    println("Writing local file to DFS")
+    val dfsFilename = dfsDirPath + "/dfs_read_write_test"
+    val fileRDD = sc.parallelize(fileContents)
+    fileRDD.saveAsTextFile(dfsFilename)
+
+    println("Reading file from DFS and running Word Count")
+    val readFileRDD = sc.textFile(dfsFilename)
+
+    val dfsWordCount = readFileRDD
+      .flatMap(_.split(" "))
+      .flatMap(_.split("\t"))
+      .filter(_.size > 0)
+      .map(w => (w, 1))
+      .countByKey()
+      .values
+      .sum
+
+    sc.stop()
+
+    if (localWordCount == dfsWordCount) {
+      println(s"Success! Local Word Count ($localWordCount) " +
+        s"and DFS Word Count ($dfsWordCount) agree.")
+    } else {
+      println(s"Failure! Local Word Count ($localWordCount) " +
+        s"and DFS Word Count ($dfsWordCount) disagree.")
+    }
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org