You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/15 04:32:34 UTC

git commit: (NOJIRA) fixing picking up mahout jars for distributed set up (Note: how was it even working before moment, and when did it go broken moment)

Repository: mahout
Updated Branches:
  refs/heads/master a70f48537 -> a6c8346dd


(NOJIRA) fixing picking up mahout jars for distributed set up
(Note: how was it even working before moment, and when did it go broken moment)


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a6c8346d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a6c8346d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a6c8346d

Branch: refs/heads/master
Commit: a6c8346ddb82390e2f5b5bd8c5968c436264e8a6
Parents: a70f485
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Thu Aug 14 19:30:44 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Thu Aug 14 19:30:44 2014 -0700

----------------------------------------------------------------------
 bin/mahout                                      |   4 +
 .../mahout/math/drm/DrmLikeSuiteBase.scala      |  13 +-
 .../apache/mahout/sparkbindings/package.scala   | 126 ++++++++++---------
 .../sparkbindings/SparkBindingsSuite.scala      |  34 +++++
 4 files changed, 116 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index 0174b31..fafd5c3 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -173,6 +173,10 @@ then
       CLASSPATH=${CLASSPATH}:$f;
     done
 
+    for f in $MAHOUT_HOME/math/target/mahout-math-*.jar ; do
+      CLASSPATH=${CLASSPATH}:$f;
+    done
+
     for f in $MAHOUT_HOME/spark/target/mahout-spark_*.jar ; do
       CLASSPATH=${CLASSPATH}:$f;
     done

http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
index 80fb285..7a13124 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -48,6 +48,7 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
     // Print out to see what it is we collected:
     println(inCoreB)
 
+    (inCoreA - inCoreB).norm should be < 1e-7
   }
 
   test("DRM parallelizeEmpty") {
@@ -57,13 +58,15 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
     // collect back into in-core
     val inCoreEmpty = drmEmpty.collect
 
-    //print out to see what it is we collected:
-    println(inCoreEmpty)
-    printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
-    printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
+    inCoreEmpty.sum.abs should be < 1e-7
+    drmEmpty.nrow shouldBe 100
+    drmEmpty.ncol shouldBe 50
+    inCoreEmpty.nrow shouldBe 100
+    inCoreEmpty.ncol shouldBe 50
+
 
 
-  }
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 6639a34..311cf82 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -59,69 +59,15 @@ package object sparkbindings {
       sparkConf: SparkConf = new SparkConf(),
       addMahoutJars: Boolean = true
       ): SparkDistributedContext = {
+
     val closeables = new java.util.ArrayDeque[Closeable]()
 
     try {
 
       if (addMahoutJars) {
-        var mhome = System.getenv("MAHOUT_HOME")
-        if (mhome == null) mhome = System.getProperty("mahout.home")
-
-        if (mhome == null)
-          throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.")
-
-        // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command.
-
-        val fmhome = new File(mhome)
-        val bin = new File(fmhome, "bin")
-        val exec = new File(bin, "mahout")
-        if (!exec.canExecute)
-          throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath))
-
-        val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath"))
-
-        closeables.addFirst(new Closeable {
-          def close() {
-            p.destroy()
-          }
-        })
-
-        val r = new BufferedReader(new InputStreamReader(p.getInputStream))
-        closeables.addFirst(r)
-
-        val w = new StringWriter()
-        closeables.addFirst(w)
-
-        var continue = true;
-        val jars = new ArrayBuffer[String]()
-        do {
-          val cp = r.readLine()
-          if (cp == null)
-            throw new IllegalArgumentException(
-              "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
-            )
-
-          val j = cp.split(File.pathSeparatorChar)
-          if (j.size > 10) {
-            // assume this is a valid classpath line
-            jars ++= j
-            continue = false
-          }
-        } while (continue)
-
-        //      if (s_log.isDebugEnabled) {
-        //        s_log.debug("Mahout jars:")
-        //        jars.foreach(j => s_log.debug(j))
-        //      }
 
         // context specific jars
-        val mcjars = jars.filter(j =>
-          j.matches(".*mahout-math-.*\\.jar") ||
-              j.matches(".*mahout-math-scala-.*\\.jar") ||
-              j.matches(".*mahout-mrlegacy-.*\\.jar") ||
-              j.matches(".*mahout-spark-.*\\.jar")
-        ).filter(n => !n.matches(".*-tests.jar") && !n.matches(".*-sources.jar")) ++
-            SparkContext.jarOfClass(classOf[DrmLike[_]])
+        val mcjars = findMahoutContextJars(closeables)
 
         if (log.isDebugEnabled) {
           log.debug("Mahout jars:")
@@ -208,5 +154,73 @@ package object sparkbindings {
       _canHaveMissingRows = canHaveMissingRows
     )
 
+  /** Acquire proper Mahout jars to be added to task context based on current MAHOUT_HOME. */
+  private[sparkbindings] def findMahoutContextJars(closeables:java.util.Deque[Closeable]) = {
+    var mhome = System.getenv("MAHOUT_HOME")
+    if (mhome == null) mhome = System.getProperty("mahout.home")
+
+    if (mhome == null)
+      throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.")
+
+    // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command.
+
+    val fmhome = new File(mhome)
+    val bin = new File(fmhome, "bin")
+    val exec = new File(bin, "mahout")
+    if (!exec.canExecute)
+      throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath))
+
+    val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath"))
+
+    closeables.addFirst(new Closeable {
+      def close() {
+        p.destroy()
+      }
+    })
+
+    val r = new BufferedReader(new InputStreamReader(p.getInputStream))
+    closeables.addFirst(r)
+
+    val w = new StringWriter()
+    closeables.addFirst(w)
+
+    var continue = true;
+    val jars = new ArrayBuffer[String]()
+    do {
+      val cp = r.readLine()
+      if (cp == null)
+        throw new IllegalArgumentException(
+          "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
+        )
+
+      val j = cp.split(File.pathSeparatorChar)
+      if (j.size > 10) {
+        // assume this is a valid classpath line
+        jars ++= j
+        continue = false
+      }
+    } while (continue)
+
+//    jars.foreach(j => log.info(j))
+
+    // context specific jars
+    val mcjars = jars.filter(j =>
+      j.matches(".*mahout-math-\\d.*\\.jar") ||
+          j.matches(".*mahout-math-scala_\\d.*\\.jar") ||
+          j.matches(".*mahout-mrlegacy-\\d.*\\.jar") ||
+          j.matches(".*mahout-spark_\\d.*\\.jar")
+    )
+        // Tune out "bad" classifiers
+        .filter(n =>
+      !n.matches(".*-tests.jar") &&
+          !n.matches(".*-sources.jar") &&
+          !n.matches(".*-job.jar") &&
+          // During maven tests, the maven classpath also creeps in for some reason
+          !n.matches(".*/.m2/.*")
+        )
+
+    mcjars
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
new file mode 100644
index 0000000..b5974bd
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
@@ -0,0 +1,34 @@
+package org.apache.mahout.sparkbindings
+
+import org.scalatest.FunSuite
+import java.util
+import java.io.{File, Closeable}
+import org.apache.mahout.common.IOUtils
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+/**
+ * @author dmitriy
+ */
+class SparkBindingsSuite extends FunSuite with DistributedSparkSuite {
+
+  // This test will succeed only when MAHOUT_HOME is set in the environment. So we keep it for
+  // diagnorstic purposes around, but we probably don't want it to run in the Jenkins, so we'd
+  // let it to be ignored.
+  test("context jars") {
+    System.setProperty("mahout.home", new File("..").getAbsolutePath/*"/home/dmitriy/projects/github/mahout-commits"*/)
+    val closeables = new util.ArrayDeque[Closeable]()
+    try {
+      val mahoutJars = findMahoutContextJars(closeables)
+      mahoutJars.foreach {
+        println(_)
+      }
+
+      mahoutJars.size should be > 0
+      mahoutJars.size shouldBe 4
+    } finally {
+      IOUtils.close(closeables)
+    }
+
+  }
+
+}