You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/05/22 00:05:59 UTC

mahout git commit: MAHOUT-1799:Read null row vectors from file in TextDelimeterReaderWriter driver, this closes apache/mahout#182

Repository: mahout
Updated Branches:
  refs/heads/master d38edb1cd -> bd1f7bdab


MAHOUT-1799:Read null row vectors from file in TextDelimeterReaderWriter driver, this closes apache/mahout#182


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/bd1f7bda
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/bd1f7bda
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/bd1f7bda

Branch: refs/heads/master
Commit: bd1f7bdabceeaaaffd3e7d9c372d40b3b714afc8
Parents: d38edb1
Author: smarthi <sm...@apache.org>
Authored: Sat May 21 20:05:41 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat May 21 20:05:41 2016 -0400

----------------------------------------------------------------------
 .../mahout/math/indexeddataset/Schema.scala     | 13 ++---
 .../drivers/TextDelimitedReaderWriter.scala     | 40 +++++++++------
 .../TextDelimitedReaderWriterSuite.scala        | 53 ++++++++++++++++++++
 3 files changed, 84 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/bd1f7bda/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
index 3b4a2e9..b7f120b 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -46,7 +46,7 @@ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
  * <comma, tab, or space>here may be other ignored text...)
  */
-final object DefaultIndexedDatasetElementReadSchema extends Schema(
+object DefaultIndexedDatasetElementReadSchema extends Schema(
   "delim" -> "[,\t ]", //comma, tab or space
   "filter" -> "",
   "rowIDColumn" -> 0,
@@ -59,7 +59,7 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema(
  * The default form:
  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
  */
-final object DefaultIndexedDatasetWriteSchema extends Schema(
+object DefaultIndexedDatasetWriteSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
@@ -70,10 +70,11 @@ final object DefaultIndexedDatasetWriteSchema extends Schema(
  * row-wise input. This tells the reader to input text lines of the form:
  * (rowID<tab>columnID1:score1,columnID2:score2,...)
  */
-final object DefaultIndexedDatasetReadSchema extends Schema(
+object DefaultIndexedDatasetReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ")
+  "elementDelim" -> " ",
+  "omitScore" -> false)
 
 /**
  * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
@@ -84,7 +85,7 @@ final object DefaultIndexedDatasetReadSchema extends Schema(
  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
  * output format for [[IndexedDatasetWriteBooleanSchema]]
  */
-final object IndexedDatasetReadBooleanSchema extends Schema(
+object IndexedDatasetReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
@@ -96,7 +97,7 @@ final object IndexedDatasetReadBooleanSchema extends Schema(
  * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form
  * (rowID<tab>columnID1<space>columnID2...)
  */
-final object IndexedDatasetWriteBooleanSchema extends Schema(
+object IndexedDatasetWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",

http://git-wip-us.apache.org/repos/asf/mahout/blob/bd1f7bda/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index d4f1aea..93d977b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -149,7 +149,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
 
       // get row and column IDs
       val interactions = rows.map { row =>
-        row(0) -> row(1)// rowID token -> string of column IDs+strengths
+        // rowID token -> string of column IDs+strengths or null if empty (all elements zero)
+        row(0) -> (if (row.length > 1) row(1) else null)
       }
 
       interactions.cache()
@@ -159,12 +160,15 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
 
       // the columns are in a TD string so separate them and get unique ones
       val columnIDs = interactions.flatMap { case (_, columns) => columns
-        val elements = columns.split(elementDelim)
-        val colIDs = if (!omitScore)
-          elements.map( elem => elem.split(columnIdStrengthDelim)(0) )
-        else
-          elements
-        colIDs
+        if (columns == null) None
+        else {
+          val elements = columns.split(elementDelim)
+          val colIDs = if (!omitScore)
+            elements.map(elem => elem.split(columnIdStrengthDelim)(0))
+          else
+            elements
+          colIDs
+        }
       }.distinct().collect()
 
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
@@ -186,17 +190,21 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
         interactions.map { case (rowID, columns) =>
           val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
 
-          val elements = columns.split(elementDelim)
           val row = new RandomAccessSparseVector(ncol)
-          for (element <- elements) {
-            val id = if (omitScore) element else element.split(columnIdStrengthDelim)(0)
-            val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1)
-            val strength = if (omitScore) 1.0d else {// if the input says not to omit but there is no seperator treat
-              // as omitting and return a strength of 1
-              if (element.split(columnIdStrengthDelim).size == 1) 1.0d
-              else element.split(columnIdStrengthDelim)(1).toDouble
+          if (columns != null) {
+            val elements = columns.split(elementDelim)
+            for (element <- elements) {
+              val id = if (omitScore) element else element.split(columnIdStrengthDelim)(0)
+              val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1)
+              val strength = if (omitScore) 1.0d
+              else {
+                // if the input says not to omit but there is no seperator treat
+                // as omitting and return a strength of 1
+                if (element.split(columnIdStrengthDelim).size == 1) 1.0d
+                else element.split(columnIdStrengthDelim)(1).toDouble
+              }
+              row.setQuick(columnID, strength)
             }
-            row.setQuick(columnID, strength)
           }
           rowIndex -> row
         }

http://git-wip-us.apache.org/repos/asf/mahout/blob/bd1f7bda/spark/src/test/scala/org/apache/mahout/drivers/TextDelimitedReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/TextDelimitedReaderWriterSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/TextDelimitedReaderWriterSuite.scala
new file mode 100644
index 0000000..5d92cca
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/drivers/TextDelimitedReaderWriterSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetReadSchema
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+import org.scalatest.FunSuite
+
+import scala.collection.JavaConversions._
+
+class TextDelimitedReaderWriterSuite extends FunSuite with DistributedSparkSuite {
+  test("indexedDatasetDFSRead should read sparse matrix file with null rows") {
+    val OutFile = TmpDir + "similarity-matrices/part-00000"
+
+    val lines = Array(
+      "galaxy\tnexus:1.0",
+      "ipad\tiphone:2.0",
+      "nexus\tgalaxy:3.0",
+      "iphone\tipad:4.0",
+      "surface"
+    )
+    val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(OutFile)
+
+    val data = mahoutCtx.indexedDatasetDFSRead(OutFile, DefaultIndexedDatasetReadSchema)
+
+    data.rowIDs.toMap.keySet should equal(Set("galaxy", "ipad", "nexus", "iphone", "surface"))
+    data.columnIDs.toMap.keySet should equal(Set("nexus", "iphone", "galaxy", "ipad"))
+
+    val a = data.matrix.collect
+    a.setRowLabelBindings(mapAsJavaMap(data.rowIDs.toMap).asInstanceOf[java.util.Map[java.lang.String, java.lang.Integer]])
+    a.setColumnLabelBindings(mapAsJavaMap(data.columnIDs.toMap).asInstanceOf[java.util.Map[java.lang.String, java.lang.Integer]])
+    a.get("galaxy", "nexus") should equal(1.0)
+    a.get("ipad", "iphone") should equal(2.0)
+    a.get("nexus", "galaxy") should equal(3.0)
+    a.get("iphone", "ipad") should equal(4.0)
+  }
+}