You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/10/27 05:20:11 UTC
[1/4] mahout git commit: MAHOUT-1775 FileNotFoundException caused by
aborting the process of downloading Wikipedia dataset,
closes apache/mahout #162
Repository: mahout
Updated Branches:
refs/heads/flink-binding e943b0a0d -> 854a2893c
MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout #162
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0c6351fa
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0c6351fa
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0c6351fa
Branch: refs/heads/flink-binding
Commit: 0c6351fa5459920ccfd752f54f8fd41176b0afe8
Parents: d53f0a5
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 00:41:44 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sun Oct 25 09:57:46 2015 -0400
----------------------------------------------------------------------
CHANGELOG | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/0c6351fa/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b22aed4..5cd8af5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Mahout Change Log
-Release 0.11.1 - unreleased
+Release 0.12.0 - unreleased
+
+ MAHOUT-1775: FileNotFoundException caused by aborting the process of downloading Wikipedia dataset (Bowei Zhang via smarthi)
MAHOUT-1771: Cluster dumper omits indices and 0 elements for dense vector or sparse containing 0s (srowen)
[4/4] mahout git commit: WIP, Flink-Mahout integration, created a
decorator; this closes apache/mahout#167
Posted by sm...@apache.org.
WIP, Flink-Mahout integration, created a decorator; this closes apache/mahout#167
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/854a2893
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/854a2893
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/854a2893
Branch: refs/heads/flink-binding
Commit: 854a2893c9b30fea78f9f93d59848f0eb62b175b
Parents: 38d0808
Author: smarthi <sm...@apache.org>
Authored: Tue Oct 27 00:19:46 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Oct 27 00:19:46 2015 -0400
----------------------------------------------------------------------
.../main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/854a2893/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 0bc12aa..3db0319 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -20,8 +20,6 @@ package org.apache.mahout.flinkbindings
import java.util.Collection
-import org.apache.flink.api.java.utils.DataSetUtils
-
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
[2/4] mahout git commit: MAHOUT-1775 FileNotFoundException caused by
aborting the process of downloading Wikipedia dataset,
closes apache/mahout# 162
Posted by sm...@apache.org.
MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout# 162
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d53f0a5d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d53f0a5d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d53f0a5d
Branch: refs/heads/flink-binding
Commit: d53f0a5d78000045bb12e90e3a6808cc2c369450
Parents: e943b0a
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 00:29:47 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sun Oct 25 09:57:46 2015 -0400
----------------------------------------------------------------------
examples/bin/classify-wikipedia.sh | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/d53f0a5d/examples/bin/classify-wikipedia.sh
----------------------------------------------------------------------
diff --git a/examples/bin/classify-wikipedia.sh b/examples/bin/classify-wikipedia.sh
index 470a81c..68487dc 100755
--- a/examples/bin/classify-wikipedia.sh
+++ b/examples/bin/classify-wikipedia.sh
@@ -63,6 +63,8 @@ if [ "x$alg" != "xclean" ]; then
mkdir -p ${WORK_DIR}
if [ ! -e ${WORK_DIR}/wikixml ]; then
mkdir -p ${WORK_DIR}/wikixml
+ fi
+ if [ ! -e ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml.bz2 ]; then
echo "Downloading wikipedia XML dump"
########################################################
# Datasets: uncomment and run "clean" to change dataset
@@ -74,10 +76,11 @@ if [ "x$alg" != "xclean" ]; then
######### full wikipedia dump: 10G zipped
#curl http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2 -o ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml.bz2
########################################################
-
- echo "Extracting..."
+ fi
+ if [ ! -e ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml ]; then
+ echo "Extracting..."
- cd ${WORK_DIR}/wikixml && bunzip2 enwiki-latest-pages-articles.xml.bz2 && cd .. && cd ..
+ cd ${WORK_DIR}/wikixml && bunzip2 enwiki-latest-pages-articles.xml.bz2 && cd .. && cd ..
fi
echo $START_PATH
@@ -186,4 +189,4 @@ elif [ "x$alg" == "xclean" ]; then
rm -rf $WORK_DIR
$DFSRM $WORK_DIR
fi
-# Remove the work directory
\ No newline at end of file
+# Remove the work directory
[3/4] mahout git commit: WIP, Mahout-Flink Integration, adding
missing methods; code refactoring
Posted by sm...@apache.org.
WIP, Mahout-Flink Integration, adding missing methods; code refactoring
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/38d08085
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/38d08085
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/38d08085
Branch: refs/heads/flink-binding
Commit: 38d0808523800a4369b18251e58b04d61771baf5
Parents: 0c6351f
Author: smarthi <sm...@apache.org>
Authored: Mon Oct 26 20:59:42 2015 -0700
Committer: smarthi <sm...@apache.org>
Committed: Mon Oct 26 20:59:42 2015 -0700
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 18 +++++++-
.../drm/CheckpointedFlinkDrmOps.scala | 35 +++++++++++++++
.../mahout/flinkbindings/drm/FlinkDrm.scala | 20 +++++----
.../apache/mahout/flinkbindings/package.scala | 45 +++++++++-----------
.../drm/CheckpointedDrmSpark.scala | 25 ++++++-----
.../drm/CheckpointedDrmSparkOps.scala | 19 +++++++++
.../apache/mahout/sparkbindings/package.scala | 4 +-
7 files changed, 116 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 5915c0a..0bc12aa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -20,6 +20,8 @@ package org.apache.mahout.flinkbindings
import java.util.Collection
+import org.apache.flink.api.java.utils.DataSetUtils
+
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -64,9 +66,10 @@ object FlinkEngine extends DistributedEngine {
implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
val metadata = hdfsUtils.readDrmHeader(path)
+ println(metadata)
val unwrapKey = metadata.unwrapKeyFunction
-
+ println(unwrapKey)
val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],
classOf[Writable], classOf[VectorWritable], path)
@@ -221,7 +224,9 @@ object FlinkEngine extends DistributedEngine {
/** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+
val parallelDrm = parallelize(m, numPartitions)
+
new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
}
@@ -276,6 +281,17 @@ object FlinkEngine extends DistributedEngine {
def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
+// def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
+//
+// val ncol = drmX match {
+// case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
+// case _ ⇒ -1
+// }
+//
+// val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
+//
+// }
+
/** Optional engine-specific all reduce tensor operation. */
def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix =
throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink")
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
new file mode 100644
index 0000000..a037d44
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.flinkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+
+import scala.reflect.ClassTag
+
+class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+ assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed matrix")
+
+ private[flinkbindings] val flinkDrm = drm.asInstanceOf[CheckpointedFlinkDrm[K]]
+
+ /** Flink matrix customization exposure */
+ def dataset = flinkDrm.ds
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 4a16724..dbc6b11 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -65,15 +65,17 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
val it = values.asScala.seq
val (keys, vectors) = it.unzip
- val isDense = vectors.head.isDense
-
- if (isDense) {
- val matrix = new DenseMatrix(vectors.size, ncolLocal)
- vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
- out.collect((keys.toArray(classTag), matrix))
- } else {
- val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
- out.collect((keys.toArray(classTag), matrix))
+ if (vectors.nonEmpty) {
+ val isDense = vectors.head.isDense
+
+ if (isDense) {
+ val matrix = new DenseMatrix(vectors.size, ncolLocal)
+ vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
+ out.collect((keys.toArray(classTag), matrix))
+ } else {
+ val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
+ out.collect((keys.toArray(classTag), matrix))
+ }
}
}
})
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index c77a551..656b8de 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,34 +18,19 @@
*/
package org.apache.mahout
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _}
+import org.slf4j.LoggerFactory
+
import scala.Array._
import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math._
-import org.apache.mahout.math.DenseVector
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.MatrixWritable
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DrmTuple
-import org.slf4j.LoggerFactory
-import org.apache.mahout.math.drm.logical.CheckpointAction
package object flinkbindings {
- private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings")
+ private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings")
/** Row-wise organized DRM dataset type */
type DrmDataSet[K] = DataSet[DrmTuple[K]]
@@ -64,18 +49,28 @@ package object flinkbindings {
implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext =
new FlinkDistributedContext(env)
+
implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env
- private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrm[K] = {
+ private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K])
+ : CheckpointedFlinkDrm[K] = {
+
assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix")
drm.asInstanceOf[CheckpointedFlinkDrm[K]]
}
- implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
+ implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
val flinkDrm = castCheckpointedDrm(cp)
new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
}
+ /** Adding Spark-specific ops */
+ implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrmOps[K] =
+ new CheckpointedFlinkDrmOps[K](drm)
+
+ implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedFlinkDrmOps[K] = drm: CheckpointedDrm[K]
+
+
private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 38007e0..857cca0 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -17,18 +17,18 @@
package org.apache.mahout.sparkbindings.drm
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
import org.apache.mahout.math._
-import math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
import org.apache.mahout.sparkbindings._
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+import scala.math._
+import scala.reflect._
+import scala.util.Random
/** ==Spark-specific optimizer-checkpointed DRM.==
*
@@ -39,7 +39,6 @@ import org.apache.mahout.sparkbindings._
* @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands.
* @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows
* (will require a lazy fix for some physical operations.
- * @param evidence$1 class tag context bound for K.
* @tparam K matrix key type (e.g. the keys of sequence files once persisted)
*/
class CheckpointedDrmSpark[K: ClassTag](
@@ -182,7 +181,7 @@ class CheckpointedDrmSpark[K: ClassTag](
// that nrow can be computed lazily, which always happens when rdd is already available, cached,
// and it's ok to compute small summaries without triggering huge pipelines. Which usually
// happens right after things like drmFromHDFS or drmWrap().
- val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+ val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L
val rowCount = rdd.count()
_canHaveMissingRows = maxPlus1 != rowCount ||
rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong
@@ -197,8 +196,8 @@ class CheckpointedDrmSpark[K: ClassTag](
protected def computeNCol = {
rddInput.isBlockified match {
case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
- .map(_._2.ncol).reduce(max(_, _))
- case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
+ .map(_._2.ncol).reduce(max)
+ case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max)
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 25953e1..0a1757a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.mahout.sparkbindings.drm
import org.apache.mahout.math.drm.CheckpointedDrm
http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 330ae38..91ad47d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -182,7 +182,7 @@ package object sparkbindings {
val w = new StringWriter()
closeables += w
- var continue = true;
+ var continue = true
val jars = new mutable.ArrayBuffer[String]()
do {
val cp = r.readLine()
@@ -230,7 +230,7 @@ package object sparkbindings {
if (!part1Req) warn("blockified rdd: condition not met: exactly 1 per partition")
- return part1Req
+ part1Req
}
}