You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/09/07 04:33:15 UTC
mahout git commit: MAHOUT-1865: Remove Hadoop1 Profile,
this closes apache/mahout#253
Repository: mahout
Updated Branches:
refs/heads/master c5934c2f7 -> e73fdb869
MAHOUT-1865: Remove Hadoop1 Profile, this closes apache/mahout#253
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e73fdb86
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e73fdb86
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e73fdb86
Branch: refs/heads/master
Commit: e73fdb8694e80e1e95a1213097434749726fd8af
Parents: c5934c2
Author: smarthi <sm...@apache.org>
Authored: Wed Sep 7 00:32:50 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Wed Sep 7 00:32:50 2016 -0400
----------------------------------------------------------------------
.../flinkbindings/io/Hadoop2HDFSUtil.scala | 18 ++---
.../mahout/h2o/common/Hadoop1HDFSUtil.scala | 63 ---------------
.../mahout/h2o/common/Hadoop2HDFSUtil.scala | 63 +++++++++++++++
.../apache/mahout/h2obindings/H2OEngine.scala | 4 +-
.../apache/mahout/cf/taste/hadoop/als/ALS.java | 5 +-
.../hadoop/als/FactorizationEvaluator.java | 2 +-
.../cf/taste/hadoop/item/RecommenderJob.java | 2 +-
.../recommender/svd/AbstractFactorizer.java | 2 +-
.../classifier/naivebayes/BayesUtils.java | 11 ++-
.../training/IndexInstancesMapper.java | 2 +-
pom.xml | 4 +-
.../apache/mahout/common/Hadoop1HDFSUtil.scala | 83 --------------------
.../apache/mahout/common/Hadoop2HDFSUtil.scala | 83 ++++++++++++++++++++
.../apache/mahout/drivers/TrainNBDriver.scala | 4 +-
.../mahout/sparkbindings/SparkEngine.scala | 4 +-
15 files changed, 173 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
index 9b67913..211088a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
@@ -20,16 +20,14 @@ package org.apache.mahout.flinkbindings.io
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.hadoop.io.SequenceFile.Reader
+import org.apache.hadoop.io.Writable
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies.
- */
object Hadoop2HDFSUtil extends HDFSUtil {
/**
* Read the header of a sequence file and determine the Key and Value type
- * @param path
+ * @param path - hdfs path of Sequence File
* @return
*/
def readDrmHeader(path: String): DrmMetadata = {
@@ -43,7 +41,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
// Filter out anything starting with .
.filter { s =>
- !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
+ !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory
}
// Take path
@@ -57,12 +55,8 @@ object Hadoop2HDFSUtil extends HDFSUtil {
throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
}
- // flink is retiring hadoop 1
- val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+ val reader = new Reader(fs.getConf, Reader.file(partFilePath))
- // hadoop 2 reader
-// val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf,
-// SequenceFile.Reader.file(partFilePath));
try {
new DrmMetadata(
keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
@@ -75,7 +69,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
/**
* Delete a path from the filesystem
- * @param path
+ * @param path - hdfs path
*/
def delete(path: String) {
val dfsPath = new Path(path)
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index a540cb1..0000000
--- a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.h2o.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-
- def readDrmHeader(path: String): DrmMetadata = {
- val dfsPath = new Path(path)
- val fs = dfsPath.getFileSystem(new Configuration())
-
- val partFilePath:Path = fs.listStatus(dfsPath)
-
- // Filter out anything starting with .
- .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir }
-
- // Take path
- .map(_.getPath)
-
- // Take only one, if any
- .headOption
-
- // Require there's at least one partition file found.
- .getOrElse {
- throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
- }
-
- val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
- try {
- new DrmMetadata(
- keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
- valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
- )
- } finally {
- reader.close()
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..4053d09
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.h2o.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+
+ def readDrmHeader(path: String): DrmMetadata = {
+ val dfsPath = new Path(path)
+ val fs = dfsPath.getFileSystem(new Configuration())
+
+ val partFilePath:Path = fs.listStatus(dfsPath)
+
+ // Filter out anything starting with .
+ .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+ // Take path
+ .map(_.getPath)
+
+ // Take only one, if any
+ .headOption
+
+ // Require there's at least one partition file found.
+ .getOrElse {
+ throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+ }
+
+ val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath))
+ try {
+ new DrmMetadata(
+ keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+ valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+ )
+ } finally {
+ reader.close()
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 60bf7ac..494e8a8 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -25,7 +25,7 @@ import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.logical._
import org.apache.mahout.h2obindings.ops._
import org.apache.mahout.h2obindings.drm._
-import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil}
+import org.apache.mahout.h2o.common.{Hadoop2HDFSUtil, HDFSUtil}
import org.apache.mahout.logging._
/** H2O specific non-DRM operations */
@@ -34,7 +34,7 @@ object H2OEngine extends DistributedEngine {
private final implicit val log = getLog(H2OEngine.getClass)
// By default, use Hadoop 1 utils
- var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+ var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
def colMeans[K](drm: CheckpointedDrm[K]): Vector =
H2OHelper.colMeans(drm.h2odrm.frame)
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
index 1c07b67..4bb95ae 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
@@ -17,12 +17,11 @@
package org.apache.mahout.cf.taste.hadoop.als;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -64,7 +63,7 @@ final class ALS {
LocalFileSystem localFs = FileSystem.getLocal(conf);
for (Path cachedFile : cachedFiles) {
- try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs, cachedFile, conf)){
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(cachedFile))) {
while (reader.next(rowIndex, row)) {
featureMatrix.put(rowIndex.get(), row.get());
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
index e69053c..4e6aaf5 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
@@ -112,7 +112,7 @@ public class FactorizationEvaluator extends AbstractJob {
return 0;
}
- double computeRmse(Path errors) {
+ private double computeRmse(Path errors) {
RunningAverage average = new FullRunningAverage();
for (Pair<DoubleWritable,NullWritable> entry
: new SequenceFileDirIterable<DoubleWritable, NullWritable>(errors, PathType.LIST, PathFilters.logsCRCFilter(),
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
index 643b2c3..129db1d 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
@@ -219,7 +219,7 @@ public final class RecommenderJob extends AbstractJob {
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job partialMultiply = new Job(getConf(), "partialMultiply");
+ Job partialMultiply = Job.getInstance(getConf(), "partialMultiply");
Configuration partialMultiplyConf = partialMultiply.getConfiguration();
MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class,
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
index 5225222..0a39a1d 100644
--- a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
+++ b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java
@@ -78,7 +78,7 @@ public abstract class AbstractFactorizer implements Factorizer {
}
private static FastByIDMap<Integer> createIDMapping(int size, LongPrimitiveIterator idIterator) {
- FastByIDMap<Integer> mapping = new FastByIDMap<Integer>(size);
+ FastByIDMap<Integer> mapping = new FastByIDMap<>(size);
int index = 0;
while (idIterator.hasNext()) {
mapping.put(idIterator.nextLong(), index++);
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
index c09dd83..4db8b17 100644
--- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
+++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
@@ -17,14 +17,13 @@
package org.apache.mahout.classifier.naivebayes;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -102,7 +101,9 @@ public final class BayesUtils {
throws IOException {
FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
int i = 0;
- try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)) {
+ try (SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath),
+ SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))) {
for (String label : labels) {
writer.append(new Text(label), new IntWritable(i++));
}
@@ -115,7 +116,9 @@ public final class BayesUtils {
FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
Collection<String> seen = new HashSet<>();
int i = 0;
- try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)){
+ try (SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath),
+ SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))){
for (Object label : labels) {
String theLabel = SLASH.split(((Pair<?, ?>) label).getFirst().toString())[1];
if (!seen.contains(theLabel)) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
index 40ca2e9..4df869e 100644
--- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
+++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
@@ -31,7 +31,7 @@ public class IndexInstancesMapper extends Mapper<Text, VectorWritable, IntWritab
private static final Pattern SLASH = Pattern.compile("/");
- public enum Counter { SKIPPED_INSTANCES }
+ enum Counter { SKIPPED_INSTANCES }
private OpenObjectIntHashMap<String> labelIndex;
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58d32b2..0120a8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,11 +118,11 @@
<mscala.version>3.2.0</mscala.version>
<hbase.version>1.0.0</hbase.version>
<lucene.version>5.5.2</lucene.version>
- <slf4j.version>1.7.19</slf4j.version>
+ <slf4j.version>1.7.21</slf4j.version>
<scala.compat.version>2.10</scala.compat.version>
<scala.version>2.10.4</scala.version>
<spark.version>1.5.2</spark.version>
- <flink.version>1.1.1</flink.version>
+ <flink.version>1.1.2</flink.version>
<h2o.version>0.1.25</h2o.version>
<jackson.version>2.7.4</jackson.version>
</properties>
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 29599b8..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-import org.apache.spark.SparkContext
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-
- /** Read DRM header information off (H)DFS. */
- override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = {
-
- val dfsPath = new Path(path)
-
- val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
-
- // Apparently getFileSystem() doesn't set conf??
- fs.setConf(sc.hadoopConfiguration)
-
- val partFilePath:Path = fs.listStatus(dfsPath)
-
- // Filter out anything starting with .
- .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir }
-
- // Take path
- .map(_.getPath)
-
- // Take only one, if any
- .headOption
-
- // Require there's at least one partition file found.
- .getOrElse {
- throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
- }
-
- val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
- try {
- new DrmMetadata(
- keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
- valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
- )
- } finally {
- reader.close()
- }
-
- }
-
- /**
- * Delete a path from the filesystem
- * @param path
- */
- def delete(path: String) {
- val dfsPath = new Path(path)
- val fs = dfsPath.getFileSystem(new Configuration())
-
- if (fs.exists(dfsPath)) {
- fs.delete(dfsPath, true)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..de601d5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.spark.SparkContext
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+
+ /** Read DRM header information off (H)DFS. */
+ override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = {
+
+ val dfsPath = new Path(path)
+
+ val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
+
+ // Apparently getFileSystem() doesn't set conf??
+ fs.setConf(sc.hadoopConfiguration)
+
+ val partFilePath:Path = fs.listStatus(dfsPath)
+
+ // Filter out anything starting with .
+ .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+ // Take path
+ .map(_.getPath)
+
+ // Take only one, if any
+ .headOption
+
+ // Require there's at least one partition file found.
+ .getOrElse {
+ throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+ }
+
+ val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath))
+ try {
+ new DrmMetadata(
+ keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+ valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+ )
+ } finally {
+ reader.close()
+ }
+
+ }
+
+ /**
+ * Delete a path from the filesystem
+ * @param path - hdfs path
+ */
+ def delete(path: String) {
+ val dfsPath = new Path(path)
+ val fs = dfsPath.getFileSystem(new Configuration())
+
+ if (fs.exists(dfsPath)) {
+ fs.delete(dfsPath, true)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index eeed97a..d0e711a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.drivers
import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _}
-import org.apache.mahout.common.Hadoop1HDFSUtil
+import org.apache.mahout.common.Hadoop2HDFSUtil
import org.apache.mahout.math.drm
import org.apache.mahout.math.drm.DrmLike
@@ -95,7 +95,7 @@ object TrainNBDriver extends MahoutSparkDriver {
val fullPathToModel = outputPath + NBModel.modelBaseDirectory
if (overwrite) {
- Hadoop1HDFSUtil.delete(fullPathToModel)
+ Hadoop2HDFSUtil.delete(fullPathToModel)
}
val trainingSet = readTrainingSet()
http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 47d14db..ee526c5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.sparkbindings
import org.apache.hadoop.io._
-import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil}
+import org.apache.mahout.common.{HDFSUtil, Hadoop2HDFSUtil}
import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
import org.apache.mahout.math._
import org.apache.mahout.math.drm._
@@ -39,7 +39,7 @@ import scala.reflect.ClassTag
object SparkEngine extends DistributedEngine {
// By default, use Hadoop 1 utils
- var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+ var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
def colSums[K](drm: CheckpointedDrm[K]): Vector = {
val n = drm.ncol