You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/09/07 04:33:15 UTC

mahout git commit: MAHOUT-1865: Remove Hadoop1 Profile, this closes apache/mahout#253

Repository: mahout
Updated Branches:
  refs/heads/master c5934c2f7 -> e73fdb869


MAHOUT-1865: Remove Hadoop1 Profile, this closes apache/mahout#253


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e73fdb86
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e73fdb86
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e73fdb86

Branch: refs/heads/master
Commit: e73fdb8694e80e1e95a1213097434749726fd8af
Parents: c5934c2
Author: smarthi <sm...@apache.org>
Authored: Wed Sep 7 00:32:50 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Wed Sep 7 00:32:50 2016 -0400

----------------------------------------------------------------------
 .../flinkbindings/io/Hadoop2HDFSUtil.scala      | 18 ++---
 .../mahout/h2o/common/Hadoop1HDFSUtil.scala     | 63 ---------------
 .../mahout/h2o/common/Hadoop2HDFSUtil.scala     | 63 +++++++++++++++
 .../apache/mahout/h2obindings/H2OEngine.scala   |  4 +-
 .../apache/mahout/cf/taste/hadoop/als/ALS.java  |  5 +-
 .../hadoop/als/FactorizationEvaluator.java      |  2 +-
 .../cf/taste/hadoop/item/RecommenderJob.java    |  2 +-
 .../recommender/svd/AbstractFactorizer.java     |  2 +-
 .../classifier/naivebayes/BayesUtils.java       | 11 ++-
 .../training/IndexInstancesMapper.java          |  2 +-
 pom.xml                                         |  4 +-
 .../apache/mahout/common/Hadoop1HDFSUtil.scala  | 83 --------------------
 .../apache/mahout/common/Hadoop2HDFSUtil.scala  | 83 ++++++++++++++++++++
 .../apache/mahout/drivers/TrainNBDriver.scala   |  4 +-
 .../mahout/sparkbindings/SparkEngine.scala      |  4 +-
 15 files changed, 173 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
index 9b67913..211088a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
@@ -20,16 +20,14 @@ package org.apache.mahout.flinkbindings.io
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.hadoop.io.SequenceFile.Reader
+import org.apache.hadoop.io.Writable
 
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies.
- */
 object Hadoop2HDFSUtil extends HDFSUtil {
 
   /**
    * Read the header of a sequence file and determine the Key and Value type
-   * @param path
+   * @param path - hdfs path of Sequence File
    * @return
    */
   def readDrmHeader(path: String): DrmMetadata = {
@@ -43,7 +41,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
 
       // Filter out anything starting with .
       .filter { s =>
-        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
+        !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory
       }
 
       // Take path
@@ -57,12 +55,8 @@ object Hadoop2HDFSUtil extends HDFSUtil {
         throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
       }
 
-    // flink is retiring hadoop 1
-     val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+     val reader = new Reader(fs.getConf, Reader.file(partFilePath))
 
-    // hadoop 2 reader
-//    val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf,
-//      SequenceFile.Reader.file(partFilePath));
     try {
       new DrmMetadata(
         keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
@@ -75,7 +69,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
 
   /**
    * Delete a path from the filesystem
-   * @param path
+   * @param path - hdfs path
    */
   def delete(path: String) {
     val dfsPath = new Path(path)

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index a540cb1..0000000
--- a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.h2o.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-  
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    val partFilePath:Path = fs.listStatus(dfsPath)
-
-        // Filter out anything starting with .
-        .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir }
-
-        // Take path
-        .map(_.getPath)
-
-        // Take only one, if any
-        .headOption
-
-        // Require there's at least one partition file found.
-        .getOrElse {
-      throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
-    }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
-      )
-    } finally {
-      reader.close()
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..4053d09
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.h2o.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+  
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    val partFilePath:Path = fs.listStatus(dfsPath)
+
+        // Filter out anything starting with .
+        .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+        // Take path
+        .map(_.getPath)
+
+        // Take only one, if any
+        .headOption
+
+        // Require there's at least one partition file found.
+        .getOrElse {
+      throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+    }
+
+    val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath))
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+      )
+    } finally {
+      reader.close()
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 60bf7ac..494e8a8 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -25,7 +25,7 @@ import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.h2obindings.ops._
 import org.apache.mahout.h2obindings.drm._
-import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil}
+import org.apache.mahout.h2o.common.{Hadoop2HDFSUtil, HDFSUtil}
 import org.apache.mahout.logging._
 
 /** H2O specific non-DRM operations */
@@ -34,7 +34,7 @@ object H2OEngine extends DistributedEngine {
   private final implicit val log = getLog(H2OEngine.getClass)
 
   // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   def colMeans[K](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.colMeans(drm.h2odrm.frame)

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
index 1c07b67..4bb95ae 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
@@ -17,12 +17,11 @@
 
 package org.apache.mahout.cf.taste.hadoop.als;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -64,7 +63,7 @@ final class ALS {
     LocalFileSystem localFs = FileSystem.getLocal(conf);
 
     for (Path cachedFile : cachedFiles) {
-      try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs, cachedFile, conf)){
+      try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(cachedFile))) {
         while (reader.next(rowIndex, row)) {
           featureMatrix.put(rowIndex.get(), row.get());
         }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
index e69053c..4e6aaf5 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
@@ -112,7 +112,7 @@ public class FactorizationEvaluator extends AbstractJob {
     return 0;
   }
 
-  double computeRmse(Path errors) {
+  private double computeRmse(Path errors) {
     RunningAverage average = new FullRunningAverage();
     for (Pair<DoubleWritable,NullWritable> entry
         : new SequenceFileDirIterable<DoubleWritable, NullWritable>(errors, PathType.LIST, PathFilters.logsCRCFilter(),

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
index 643b2c3..129db1d 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
@@ -219,7 +219,7 @@ public final class RecommenderJob extends AbstractJob {
 
     //start the multiplication of the co-occurrence matrix by the user vectors
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job partialMultiply = new Job(getConf(), "partialMultiply");
+      Job partialMultiply = Job.getInstance(getConf(), "partialMultiply");
       Configuration partialMultiplyConf = partialMultiply.getConfiguration();
 
       MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class,

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
index 5225222..0a39a1d 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
@@ -78,7 +78,7 @@ public abstract class AbstractFactorizer implements Factorizer {
   }
 
   private static FastByIDMap<Integer> createIDMapping(int size, LongPrimitiveIterator idIterator) {
-    FastByIDMap<Integer> mapping = new FastByIDMap<Integer>(size);
+    FastByIDMap<Integer> mapping = new FastByIDMap<>(size);
     int index = 0;
     while (idIterator.hasNext()) {
       mapping.put(idIterator.nextLong(), index++);

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
index c09dd83..4db8b17 100644
--- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
+++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
@@ -17,14 +17,13 @@
 
 package org.apache.mahout.classifier.naivebayes;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -102,7 +101,9 @@ public final class BayesUtils {
     throws IOException {
     FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
     int i = 0;
-    try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)) {
+    try (SequenceFile.Writer writer =
+           SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath),
+             SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
       for (String label : labels) {
         writer.append(new Text(label), new IntWritable(i++));
       }
@@ -115,7 +116,9 @@ public final class BayesUtils {
     FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
     Collection<String> seen = new HashSet<>();
     int i = 0;
-    try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)){
+    try (SequenceFile.Writer writer =
+           SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath),
+             SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))){
       for (Object label : labels) {
         String theLabel = SLASH.split(((Pair<?, ?>) label).getFirst().toString())[1];
         if (!seen.contains(theLabel)) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
index 40ca2e9..4df869e 100644
--- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
+++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
@@ -31,7 +31,7 @@ public class IndexInstancesMapper extends Mapper<Text, VectorWritable, IntWritab
 
   private static final Pattern SLASH = Pattern.compile("/");
 
-  public enum Counter { SKIPPED_INSTANCES }
+  enum Counter { SKIPPED_INSTANCES }
 
   private OpenObjectIntHashMap<String> labelIndex;
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58d32b2..0120a8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,11 +118,11 @@
     <mscala.version>3.2.0</mscala.version>
     <hbase.version>1.0.0</hbase.version>
     <lucene.version>5.5.2</lucene.version>
-    <slf4j.version>1.7.19</slf4j.version>
+    <slf4j.version>1.7.21</slf4j.version>
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
     <spark.version>1.5.2</spark.version>
-    <flink.version>1.1.1</flink.version>
+    <flink.version>1.1.2</flink.version>
     <h2o.version>0.1.25</h2o.version>
     <jackson.version>2.7.4</jackson.version>
   </properties>

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 29599b8..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-import org.apache.spark.SparkContext
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-
-  /** Read DRM header information off (H)DFS. */
-  override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = {
-
-    val dfsPath = new Path(path)
-
-    val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
-
-    // Apparently getFileSystem() doesn't set conf??
-    fs.setConf(sc.hadoopConfiguration)
-
-    val partFilePath:Path = fs.listStatus(dfsPath)
-
-        // Filter out anything starting with .
-        .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir }
-
-        // Take path
-        .map(_.getPath)
-
-        // Take only one, if any
-        .headOption
-
-        // Require there's at least one partition file found.
-        .getOrElse {
-      throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
-    }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
-      )
-    } finally {
-      reader.close()
-    }
-
-  }
-
-  /**
-   * Delete a path from the filesystem
-   * @param path
-   */
-  def delete(path: String) {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    if (fs.exists(dfsPath)) {
-      fs.delete(dfsPath, true)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..de601d5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.spark.SparkContext
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+
+  /** Read DRM header information off (H)DFS. */
+  override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = {
+
+    val dfsPath = new Path(path)
+
+    val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
+
+    // Apparently getFileSystem() doesn't set conf??
+    fs.setConf(sc.hadoopConfiguration)
+
+    val partFilePath:Path = fs.listStatus(dfsPath)
+
+        // Filter out anything starting with .
+        .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+        // Take path
+        .map(_.getPath)
+
+        // Take only one, if any
+        .headOption
+
+        // Require there's at least one partition file found.
+        .getOrElse {
+      throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+    }
+
+    val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath))
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+      )
+    } finally {
+      reader.close()
+    }
+
+  }
+
+  /**
+   * Delete a path from the filesystem
+   * @param path - hdfs path
+   */
+  def delete(path: String) {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    if (fs.exists(dfsPath)) {
+      fs.delete(dfsPath, true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index eeed97a..d0e711a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.drivers
 
 import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _}
-import org.apache.mahout.common.Hadoop1HDFSUtil
+import org.apache.mahout.common.Hadoop2HDFSUtil
 import org.apache.mahout.math.drm
 import org.apache.mahout.math.drm.DrmLike
 
@@ -95,7 +95,7 @@ object TrainNBDriver extends MahoutSparkDriver {
     val fullPathToModel = outputPath + NBModel.modelBaseDirectory
 
     if (overwrite) {
-       Hadoop1HDFSUtil.delete(fullPathToModel)
+       Hadoop2HDFSUtil.delete(fullPathToModel)
     }
 
     val trainingSet = readTrainingSet()

http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 47d14db..ee526c5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.sparkbindings
 
 import org.apache.hadoop.io._
-import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil}
+import org.apache.mahout.common.{HDFSUtil, Hadoop2HDFSUtil}
 import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
@@ -39,7 +39,7 @@ import scala.reflect.ClassTag
 object SparkEngine extends DistributedEngine {
 
   // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   def colSums[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol