You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/15 04:32:34 UTC
git commit: (NOJIRA) fixing picking up mahout jars for distributed
set up (Note: how was it even working before moment,
and when did it go broken moment)
Repository: mahout
Updated Branches:
refs/heads/master a70f48537 -> a6c8346dd
(NOJIRA) fixing picking up mahout jars for distributed set up
(Note: how was it even working before moment, and when did it go broken moment)
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a6c8346d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a6c8346d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a6c8346d
Branch: refs/heads/master
Commit: a6c8346ddb82390e2f5b5bd8c5968c436264e8a6
Parents: a70f485
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Thu Aug 14 19:30:44 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Thu Aug 14 19:30:44 2014 -0700
----------------------------------------------------------------------
bin/mahout | 4 +
.../mahout/math/drm/DrmLikeSuiteBase.scala | 13 +-
.../apache/mahout/sparkbindings/package.scala | 126 ++++++++++---------
.../sparkbindings/SparkBindingsSuite.scala | 34 +++++
4 files changed, 116 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index 0174b31..fafd5c3 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -173,6 +173,10 @@ then
CLASSPATH=${CLASSPATH}:$f;
done
+ for f in $MAHOUT_HOME/math/target/mahout-math-*.jar ; do
+ CLASSPATH=${CLASSPATH}:$f;
+ done
+
for f in $MAHOUT_HOME/spark/target/mahout-spark_*.jar ; do
CLASSPATH=${CLASSPATH}:$f;
done
http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
index 80fb285..7a13124 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -48,6 +48,7 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
// Print out to see what it is we collected:
println(inCoreB)
+ (inCoreA - inCoreB).norm should be < 1e-7
}
test("DRM parallelizeEmpty") {
@@ -57,13 +58,15 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
// collect back into in-core
val inCoreEmpty = drmEmpty.collect
- //print out to see what it is we collected:
- println(inCoreEmpty)
- printf("drm nrow:%d, ncol:%d\n", drmEmpty.nrow, drmEmpty.ncol)
- printf("in core nrow:%d, ncol:%d\n", inCoreEmpty.nrow, inCoreEmpty.ncol)
+ inCoreEmpty.sum.abs should be < 1e-7
+ drmEmpty.nrow shouldBe 100
+ drmEmpty.ncol shouldBe 50
+ inCoreEmpty.nrow shouldBe 100
+ inCoreEmpty.ncol shouldBe 50
+
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 6639a34..311cf82 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -59,69 +59,15 @@ package object sparkbindings {
sparkConf: SparkConf = new SparkConf(),
addMahoutJars: Boolean = true
): SparkDistributedContext = {
+
val closeables = new java.util.ArrayDeque[Closeable]()
try {
if (addMahoutJars) {
- var mhome = System.getenv("MAHOUT_HOME")
- if (mhome == null) mhome = System.getProperty("mahout.home")
-
- if (mhome == null)
- throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.")
-
- // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command.
-
- val fmhome = new File(mhome)
- val bin = new File(fmhome, "bin")
- val exec = new File(bin, "mahout")
- if (!exec.canExecute)
- throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath))
-
- val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath"))
-
- closeables.addFirst(new Closeable {
- def close() {
- p.destroy()
- }
- })
-
- val r = new BufferedReader(new InputStreamReader(p.getInputStream))
- closeables.addFirst(r)
-
- val w = new StringWriter()
- closeables.addFirst(w)
-
- var continue = true;
- val jars = new ArrayBuffer[String]()
- do {
- val cp = r.readLine()
- if (cp == null)
- throw new IllegalArgumentException(
- "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
- )
-
- val j = cp.split(File.pathSeparatorChar)
- if (j.size > 10) {
- // assume this is a valid classpath line
- jars ++= j
- continue = false
- }
- } while (continue)
-
- // if (s_log.isDebugEnabled) {
- // s_log.debug("Mahout jars:")
- // jars.foreach(j => s_log.debug(j))
- // }
// context specific jars
- val mcjars = jars.filter(j =>
- j.matches(".*mahout-math-.*\\.jar") ||
- j.matches(".*mahout-math-scala-.*\\.jar") ||
- j.matches(".*mahout-mrlegacy-.*\\.jar") ||
- j.matches(".*mahout-spark-.*\\.jar")
- ).filter(n => !n.matches(".*-tests.jar") && !n.matches(".*-sources.jar")) ++
- SparkContext.jarOfClass(classOf[DrmLike[_]])
+ val mcjars = findMahoutContextJars(closeables)
if (log.isDebugEnabled) {
log.debug("Mahout jars:")
@@ -208,5 +154,73 @@ package object sparkbindings {
_canHaveMissingRows = canHaveMissingRows
)
+ /** Acquire proper Mahout jars to be added to task context based on current MAHOUT_HOME. */
+ private[sparkbindings] def findMahoutContextJars(closeables:java.util.Deque[Closeable]) = {
+ var mhome = System.getenv("MAHOUT_HOME")
+ if (mhome == null) mhome = System.getProperty("mahout.home")
+
+ if (mhome == null)
+ throw new IllegalArgumentException("MAHOUT_HOME is required to spawn mahout-based spark jobs.")
+
+ // Figure Mahout classpath using $MAHOUT_HOME/mahout classpath command.
+
+ val fmhome = new File(mhome)
+ val bin = new File(fmhome, "bin")
+ val exec = new File(bin, "mahout")
+ if (!exec.canExecute)
+ throw new IllegalArgumentException("Cannot execute %s.".format(exec.getAbsolutePath))
+
+ val p = Runtime.getRuntime.exec(Array(exec.getAbsolutePath, "-spark", "classpath"))
+
+ closeables.addFirst(new Closeable {
+ def close() {
+ p.destroy()
+ }
+ })
+
+ val r = new BufferedReader(new InputStreamReader(p.getInputStream))
+ closeables.addFirst(r)
+
+ val w = new StringWriter()
+ closeables.addFirst(w)
+
+ var continue = true;
+ val jars = new ArrayBuffer[String]()
+ do {
+ val cp = r.readLine()
+ if (cp == null)
+ throw new IllegalArgumentException(
+ "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
+ )
+
+ val j = cp.split(File.pathSeparatorChar)
+ if (j.size > 10) {
+ // assume this is a valid classpath line
+ jars ++= j
+ continue = false
+ }
+ } while (continue)
+
+// jars.foreach(j => log.info(j))
+
+ // context specific jars
+ val mcjars = jars.filter(j =>
+ j.matches(".*mahout-math-\\d.*\\.jar") ||
+ j.matches(".*mahout-math-scala_\\d.*\\.jar") ||
+ j.matches(".*mahout-mrlegacy-\\d.*\\.jar") ||
+ j.matches(".*mahout-spark_\\d.*\\.jar")
+ )
+ // Tune out "bad" classifiers
+ .filter(n =>
+ !n.matches(".*-tests.jar") &&
+ !n.matches(".*-sources.jar") &&
+ !n.matches(".*-job.jar") &&
+ // During maven tests, the maven classpath also creeps in for some reason
+ !n.matches(".*/.m2/.*")
+ )
+
+ mcjars
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a6c8346d/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
new file mode 100644
index 0000000..b5974bd
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
@@ -0,0 +1,34 @@
+package org.apache.mahout.sparkbindings
+
+import org.scalatest.FunSuite
+import java.util
+import java.io.{File, Closeable}
+import org.apache.mahout.common.IOUtils
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+/**
+ * @author dmitriy
+ */
+class SparkBindingsSuite extends FunSuite with DistributedSparkSuite {
+
+ // This test will succeed only when MAHOUT_HOME is set in the environment. So we keep it for
+ // diagnorstic purposes around, but we probably don't want it to run in the Jenkins, so we'd
+ // let it to be ignored.
+ test("context jars") {
+ System.setProperty("mahout.home", new File("..").getAbsolutePath/*"/home/dmitriy/projects/github/mahout-commits"*/)
+ val closeables = new util.ArrayDeque[Closeable]()
+ try {
+ val mahoutJars = findMahoutContextJars(closeables)
+ mahoutJars.foreach {
+ println(_)
+ }
+
+ mahoutJars.size should be > 0
+ mahoutJars.size shouldBe 4
+ } finally {
+ IOUtils.close(closeables)
+ }
+
+ }
+
+}