You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/10/27 05:20:11 UTC

[1/4] mahout git commit: MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout #162

Repository: mahout
Updated Branches:
  refs/heads/flink-binding e943b0a0d -> 854a2893c


MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout #162


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0c6351fa
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0c6351fa
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0c6351fa

Branch: refs/heads/flink-binding
Commit: 0c6351fa5459920ccfd752f54f8fd41176b0afe8
Parents: d53f0a5
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 00:41:44 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sun Oct 25 09:57:46 2015 -0400

----------------------------------------------------------------------
 CHANGELOG | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/0c6351fa/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b22aed4..5cd8af5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Mahout Change Log
 
-Release 0.11.1 - unreleased
+Release 0.12.0 - unreleased
+
+  MAHOUT-1775: FileNotFoundException caused by aborting the process of downloading Wikipedia dataset (Bowei Zhang via smarthi)
 
   MAHOUT-1771: Cluster dumper omits indices and 0 elements for dense vector or sparse containing 0s (srowen)
 


[4/4] mahout git commit: WIP, Flink-Mahout integration, created a decorator; this closes apache/mahout#167

Posted by sm...@apache.org.
WIP, Flink-Mahout integration, created a decorator; this closes apache/mahout#167


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/854a2893
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/854a2893
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/854a2893

Branch: refs/heads/flink-binding
Commit: 854a2893c9b30fea78f9f93d59848f0eb62b175b
Parents: 38d0808
Author: smarthi <sm...@apache.org>
Authored: Tue Oct 27 00:19:46 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Oct 27 00:19:46 2015 -0400

----------------------------------------------------------------------
 .../main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala   | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/854a2893/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 0bc12aa..3db0319 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -20,8 +20,6 @@ package org.apache.mahout.flinkbindings
 
 import java.util.Collection
 
-import org.apache.flink.api.java.utils.DataSetUtils
-
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag


[2/4] mahout git commit: MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout# 162

Posted by sm...@apache.org.
MAHOUT-1775 FileNotFoundException caused by aborting the process of downloading Wikipedia dataset, closes apache/mahout# 162


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d53f0a5d
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d53f0a5d
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d53f0a5d

Branch: refs/heads/flink-binding
Commit: d53f0a5d78000045bb12e90e3a6808cc2c369450
Parents: e943b0a
Author: smarthi <sm...@apache.org>
Authored: Sun Oct 25 00:29:47 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sun Oct 25 09:57:46 2015 -0400

----------------------------------------------------------------------
 examples/bin/classify-wikipedia.sh | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/d53f0a5d/examples/bin/classify-wikipedia.sh
----------------------------------------------------------------------
diff --git a/examples/bin/classify-wikipedia.sh b/examples/bin/classify-wikipedia.sh
index 470a81c..68487dc 100755
--- a/examples/bin/classify-wikipedia.sh
+++ b/examples/bin/classify-wikipedia.sh
@@ -63,6 +63,8 @@ if [ "x$alg" != "xclean" ]; then
   mkdir -p ${WORK_DIR}
     if [ ! -e ${WORK_DIR}/wikixml ]; then
         mkdir -p ${WORK_DIR}/wikixml
+    fi
+    if [ ! -e ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml.bz2 ]; then
         echo "Downloading wikipedia XML dump"
         ########################################################   
         #  Datasets: uncomment and run "clean" to change dataset   
@@ -74,10 +76,11 @@ if [ "x$alg" != "xclean" ]; then
         ######### full wikipedia dump: 10G zipped
         #curl http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2 -o ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml.bz2
         ########################################################
-      
-      echo "Extracting..."
+    fi
+    if [ ! -e ${WORK_DIR}/wikixml/enwiki-latest-pages-articles.xml ]; then
+        echo "Extracting..."
        
-      cd ${WORK_DIR}/wikixml && bunzip2 enwiki-latest-pages-articles.xml.bz2 && cd .. && cd ..
+        cd ${WORK_DIR}/wikixml && bunzip2 enwiki-latest-pages-articles.xml.bz2 && cd .. && cd ..
     fi
 
 echo $START_PATH
@@ -186,4 +189,4 @@ elif [ "x$alg" == "xclean" ]; then
   rm -rf $WORK_DIR
   $DFSRM $WORK_DIR
 fi
-# Remove the work directory
\ No newline at end of file
+# Remove the work directory


[3/4] mahout git commit: WIP, Mahout-Flink Integration, adding missing methods; code refactoring

Posted by sm...@apache.org.
WIP, Mahout-Flink Integration, adding missing methods; code refactoring


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/38d08085
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/38d08085
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/38d08085

Branch: refs/heads/flink-binding
Commit: 38d0808523800a4369b18251e58b04d61771baf5
Parents: 0c6351f
Author: smarthi <sm...@apache.org>
Authored: Mon Oct 26 20:59:42 2015 -0700
Committer: smarthi <sm...@apache.org>
Committed: Mon Oct 26 20:59:42 2015 -0700

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 18 +++++++-
 .../drm/CheckpointedFlinkDrmOps.scala           | 35 +++++++++++++++
 .../mahout/flinkbindings/drm/FlinkDrm.scala     | 20 +++++----
 .../apache/mahout/flinkbindings/package.scala   | 45 +++++++++-----------
 .../drm/CheckpointedDrmSpark.scala              | 25 ++++++-----
 .../drm/CheckpointedDrmSparkOps.scala           | 19 +++++++++
 .../apache/mahout/sparkbindings/package.scala   |  4 +-
 7 files changed, 116 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 5915c0a..0bc12aa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -20,6 +20,8 @@ package org.apache.mahout.flinkbindings
 
 import java.util.Collection
 
+import org.apache.flink.api.java.utils.DataSetUtils
+
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -64,9 +66,10 @@ object FlinkEngine extends DistributedEngine {
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
     val metadata = hdfsUtils.readDrmHeader(path)
+    println(metadata)
 
     val unwrapKey = metadata.unwrapKeyFunction
-
+    println(unwrapKey)
     val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable],
       classOf[Writable], classOf[VectorWritable], path)
 
@@ -221,7 +224,9 @@ object FlinkEngine extends DistributedEngine {
   /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                            (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+
     val parallelDrm = parallelize(m, numPartitions)
+
     new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
   }
 
@@ -276,6 +281,17 @@ object FlinkEngine extends DistributedEngine {
 
   def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
 
+//  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
+//
+//    val ncol = drmX match {
+//      case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
+//      case _ ⇒ -1
+//    }
+//
+//    val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
+//
+//  }
+
   /** Optional engine-specific all reduce tensor operation. */
   def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = 
     throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink")

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
new file mode 100644
index 0000000..a037d44
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.flinkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+
+import scala.reflect.ClassTag
+
+class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+  assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed matrix")
+
+  private[flinkbindings] val flinkDrm = drm.asInstanceOf[CheckpointedFlinkDrm[K]]
+
+  /** Flink matrix customization exposure */
+  def dataset = flinkDrm.ds
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index 4a16724..dbc6b11 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -65,15 +65,17 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
         val it = values.asScala.seq
 
         val (keys, vectors) = it.unzip
-        val isDense = vectors.head.isDense
-
-        if (isDense) {
-          val matrix = new DenseMatrix(vectors.size, ncolLocal)
-          vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
-          out.collect((keys.toArray(classTag), matrix))
-        } else {
-          val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
-          out.collect((keys.toArray(classTag), matrix))
+        if (vectors.nonEmpty) {
+          val isDense = vectors.head.isDense
+
+          if (isDense) {
+            val matrix = new DenseMatrix(vectors.size, ncolLocal)
+            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
+            out.collect((keys.toArray(classTag), matrix))
+          } else {
+            val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
+            out.collect((keys.toArray(classTag), matrix))
+          }
         }
       }
     })

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index c77a551..656b8de 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,34 +18,19 @@
  */
 package org.apache.mahout
 
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _}
+import org.slf4j.LoggerFactory
+
 import scala.Array._
 import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math._
-import org.apache.mahout.math.DenseVector
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.MatrixWritable
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DrmTuple
-import org.slf4j.LoggerFactory
-import org.apache.mahout.math.drm.logical.CheckpointAction
 
 package object flinkbindings {
 
-  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings")
+  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings")
 
   /** Row-wise organized DRM dataset type */
   type DrmDataSet[K] = DataSet[DrmTuple[K]]
@@ -64,18 +49,28 @@ package object flinkbindings {
 
   implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext =
     new FlinkDistributedContext(env)
+
   implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env
 
-  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrm[K] = {
+  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K])
+    : CheckpointedFlinkDrm[K] = {
+
     assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix")
     drm.asInstanceOf[CheckpointedFlinkDrm[K]]
   }
 
-  implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
+  implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = {
     val flinkDrm = castCheckpointedDrm(cp)
     new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
   }
 
+  /** Adding Spark-specific ops */
+  implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrmOps[K] =
+    new CheckpointedFlinkDrmOps[K](drm)
+
+  implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedFlinkDrmOps[K] = drm: CheckpointedDrm[K]
+
+
   private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m)
   private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v)
   private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get()

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 38007e0..857cca0 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -17,18 +17,18 @@
 
 package org.apache.mahout.sparkbindings.drm
 
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
 import org.apache.mahout.math._
-import math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
 import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.sparkbindings._
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+import scala.math._
+import scala.reflect._
+import scala.util.Random
 
 /** ==Spark-specific optimizer-checkpointed DRM.==
   *
@@ -39,7 +39,6 @@ import org.apache.mahout.sparkbindings._
   * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands.
   * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows
   *                            (will require a lazy fix for some physical operations.
-  * @param evidence$1 class tag context bound for K.
   * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
   */
 class CheckpointedDrmSpark[K: ClassTag](
@@ -182,7 +181,7 @@ class CheckpointedDrmSpark[K: ClassTag](
       // that nrow can be computed lazily, which always happens when rdd is already available, cached,
       // and it's ok to compute small summaries without triggering huge pipelines. Which usually
       // happens right after things like drmFromHDFS or drmWrap().
-      val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+      val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L
       val rowCount = rdd.count()
       _canHaveMissingRows = maxPlus1 != rowCount ||
           rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong
@@ -197,8 +196,8 @@ class CheckpointedDrmSpark[K: ClassTag](
   protected def computeNCol = {
     rddInput.isBlockified match {
       case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
-        .map(_._2.ncol).reduce(max(_, _))
-      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
+        .map(_._2.ncol).reduce(max)
+      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 25953e1..0a1757a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math.drm.CheckpointedDrm

http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 330ae38..91ad47d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -182,7 +182,7 @@ package object sparkbindings {
     val w = new StringWriter()
     closeables += w
 
-    var continue = true;
+    var continue = true
     val jars = new mutable.ArrayBuffer[String]()
     do {
       val cp = r.readLine()
@@ -230,7 +230,7 @@ package object sparkbindings {
 
     if (!part1Req) warn("blockified rdd: condition not met: exactly 1 per partition")
 
-    return part1Req
+    part1Req
   }
 
 }