You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2016/10/11 15:10:59 UTC
[1/3] mahout git commit: added tests for new CCO data filtering to
the minimum subset needed
Repository: mahout
Updated Branches:
refs/heads/master 20fdf9b9f -> 1f5e36f24
added tests for new CCO data filtering to the minimum subset needed
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/220c4749
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/220c4749
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/220c4749
Branch: refs/heads/master
Commit: 220c47493d30c0928e116af9210ab1786068ab13
Parents: b5fe4aa
Author: pferrel <pa...@occamsmachete.com>
Authored: Sat Oct 1 14:02:50 2016 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Sat Oct 1 14:02:50 2016 -0700
----------------------------------------------------------------------
.../indexeddataset/IndexedDatasetSpark.scala | 45 +++++++++++-----
.../mahout/cf/SimilarityAnalysisSuite.scala | 55 ++++++++++++++++++++
2 files changed, 87 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index 727a95e..e7111a8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -30,7 +30,8 @@ import org.apache.spark.SparkContext._
/**
* Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
* dfsWrite method
- * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+ *
+ * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
* @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
* @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
*/
@@ -53,9 +54,8 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio
new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
}
- /**
- * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
- * replace the writer to change how it is written.
+ /** Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+ * replace the writer to change how it is written.
*/
override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
(implicit sc: DistributedContext):
@@ -65,31 +65,50 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio
}
}
+/** This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+ * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another
+ * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have
+ * the same rows and there is some record of which rows are important.
+ */
object IndexedDatasetSpark {
def apply(elements: RDD[(String, String)], existingRowIDs: Option[BiDictionary] = None)(implicit sc: SparkContext) = {
+ // broadcast the existing dictionary or create a new one, also filter by the existing dictionary or use all elements
+ val (filteredElements, rowIDDictionary_bcast, rowIDDictionary) = if (existingRowIDs.isEmpty) {
+ val newRowIDDictionary = new BiDictionary(elements.map { case (rowID, _) => rowID }.distinct().collect())
+ val newRowIDDictionary_bcast = sc.broadcast(newRowIDDictionary)
+ (elements, newRowIDDictionary_bcast, newRowIDDictionary)
+ } else {
+ val existingRowIDDictionary_bcast = sc.broadcast(existingRowIDs.get)
+ val elementsRDD = elements.filter{ case (rowID, _) =>
+ existingRowIDDictionary_bcast.value.contains(rowID)
+ }
+ (elementsRDD, existingRowIDDictionary_bcast, existingRowIDs.get)
+ }
+
// create separate collections of rowID and columnID tokens
- val rowIDs = elements.map { case (rowID, _) => rowID }.distinct().collect()
- val columnIDs = elements.map { case (_, columnID) => columnID }.distinct().collect()
+ // use the dictionary passed in or create one from the element ids
+ // val rowIDs = filteredElements.map { case (rowID, _) => rowID }.distinct().collect()
+ val columnIDs = filteredElements.map { case (_, columnID) => columnID }.distinct().collect()
// create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID
// broadcast them for access in distributed processes, so they are not recalculated in every task.
//val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
- val rowIDDictionary = existingRowIDs match {
- case Some(d) => d.merge(rowIDs)
- case None => new BiDictionary(rowIDs)
- }
- val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
+ //val rowIDDictionary = if (existingRowIDs.isEmpty)
+ // case Some(d) => d
+ // case None => new BiDictionary(filteredElements.map { case (rowID, _) => rowID }.distinct().collect())
+ // }
+ //val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
val columnIDDictionary = new BiDictionary(keys = columnIDs)
val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary)
val ncol = columnIDDictionary.size
- val nrow = rowIDDictionary.size
+ //val nrow = rowIDDictionary.size
val indexedInteractions =
- elements.map { case (rowID, columnID) =>
+ filteredElements.map { case (rowID, columnID) =>
val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, -1)
http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
index 63e0df7..945b443 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -21,9 +21,13 @@ import org.apache.mahout.math.cf.{DownsamplableCrossOccurrenceDataset, Similarit
import org.apache.mahout.math.drm._
import org.apache.mahout.math.indexeddataset.BiDictionary
import org.apache.mahout.math.scalabindings.{MatrixOps, _}
+import org.apache.mahout.sparkbindings.SparkDistributedContext
+import org.apache.mahout.sparkbindings.dc2sc
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.test.MahoutSuite
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
/* values
@@ -251,6 +255,57 @@ class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with Distributed
n should be < 1E-10
}
+ test("Cross-occurrence two IndexedDatasets different row ranks"){
+
+ val sc = dc2sc(mahoutCtx)
+
+
+ /*
+ val a = dense(
+ "u1"(1, 1, 0, 0, 0),
+ "u2"(0, 0, 1, 1, 0),
+ "u3"(0, 0, 0, 0, 1),
+ "u4"(1, 0, 0, 1, 0))
+
+ val b = dense(
+ "u1"(0, 1, 1, 0),
+ "u2"(1, 1, 1, 0),
+ "u3"(0, 0, 1, 0),
+ "u4"(1, 1, 0, 1)
+ "u5"(1, 1, 1, 1))
+*/
+ val pairsA = Seq(
+ ("u1","a1"), ("u1","a2"),
+ ("u2","a3"), ("u2","a4"),
+ ("u3","a5"),
+ ("u4","a1"), ("u4","a4"))
+
+ val pairsB = Seq(
+ ("u1","b2"), ("u1","b3"),
+ ("u2","b1"), ("u2","b2"), ("u2","b3"),
+ ("u3","b2"),
+ ("u4","b1"), ("u4","b2"), ("u4","b4"),
+ ("u5","b1"), ("u5", "b25"))
+
+
+
+ val pairRDDA = sc.parallelize(pairsA, 4)
+ val pairRDDB = sc.parallelize(pairsB, 4)
+
+ val aID = IndexedDatasetSpark(pairRDDA)(sc)
+ val bID = IndexedDatasetSpark(pairRDDB, Some(aID.rowIDs))(sc)
+
+ assert(aID.rowIDs.size == 4)
+ assert(bID.rowIDs.size == 4)
+
+ assert(aID.matrix.nrow == 4)
+ assert(bID.matrix.nrow == 4)
+
+ assert(!bID.rowIDs.contains("u5"))// this row id should be filtered out of the drm and dictionary
+ assert(!bID.columnIDs.contains("b25"))// this row id should be filtered out of the drm and dictionary
+
+ }
+
test("Cross-occurrence two IndexedDatasets LLR threshold"){
val a = dense(
(1, 1, 0, 0, 0),
[2/3] mahout git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/mahout
Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c9ee7282
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c9ee7282
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c9ee7282
Branch: refs/heads/master
Commit: c9ee7282d0c695a6eb76dda9590a5386309c758a
Parents: 220c474 20fdf9b
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Oct 10 18:24:21 2016 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Oct 10 18:24:21 2016 -0700
----------------------------------------------------------------------
.travis.yml | 5 +-
README.md | 17 ++++
doap_Mahout.rdf | 4 +-
.../sgd/AdaptiveLogisticModelParameters.java | 2 +-
.../classifier/sgd/RunAdaptiveLogistic.java | 2 +-
.../org/apache/mahout/h2obindings/ops/Ax.java | 2 +-
.../h2obindings/ops/TimesRightMatrix.java | 4 +-
.../WikipediaDatasetCreatorDriver.java | 2 +-
.../utils/clustering/GraphMLClusterWriter.java | 2 +-
.../mahout/utils/regex/RegexMapperTest.java | 2 +-
.../apache/mahout/collections/Arithmetic.java | 39 +++++----
.../org/apache/mahout/math/AbstractMatrix.java | 2 +-
.../org/apache/mahout/math/AbstractVector.java | 6 +-
.../mahout/math/CholeskyDecomposition.java | 2 +-
.../org/apache/mahout/math/ConstantVector.java | 8 +-
.../math/FileBasedSparseBinaryMatrix.java | 6 +-
.../java/org/apache/mahout/math/Matrices.java | 4 +-
.../java/org/apache/mahout/math/Matrix.java | 8 +-
.../apache/mahout/math/MatrixVectorView.java | 8 +-
.../java/org/apache/mahout/math/MurmurHash.java | 4 +-
.../apache/mahout/math/OldQRDecomposition.java | 2 +-
.../apache/mahout/math/PermutedVectorView.java | 6 +-
.../org/apache/mahout/math/QRDecomposition.java | 4 +-
.../org/apache/mahout/math/SparseMatrix.java | 2 +-
.../java/org/apache/mahout/math/Vector.java | 35 +++++---
.../java/org/apache/mahout/math/VectorView.java | 2 +-
...itFeedbackAlternatingLeastSquaresSolver.java | 2 +-
.../math/decomposer/lanczos/LanczosSolver.java | 11 +--
.../apache/mahout/math/function/Functions.java | 86 ++++++++++++++------
.../apache/mahout/math/jet/math/Arithmetic.java | 13 ++-
.../apache/mahout/math/jet/random/Gamma.java | 6 +-
.../math/jet/random/engine/MersenneTwister.java | 82 +++++++++----------
.../math/jet/random/sampling/RandomSampler.java | 7 +-
.../mahout/math/jet/stat/Probability.java | 14 ++--
.../apache/mahout/math/list/AbstractList.java | 2 +-
.../mahout/math/list/ObjectArrayList.java | 8 +-
.../org/apache/mahout/math/map/OpenHashMap.java | 6 +-
.../apache/mahout/math/random/IndianBuffet.java | 4 +-
.../mahout/math/random/PoissonSampler.java | 6 +-
.../org/apache/mahout/math/set/OpenHashSet.java | 6 +-
.../java/org/apache/mahout/math/VectorTest.java | 2 +-
.../als/AlternatingLeastSquaresSolverTest.java | 2 +-
.../mahout/math/list/ObjectArrayListTest.java | 6 +-
.../mahout/math/random/MultinomialTest.java | 20 ++---
.../mahout/math/randomized/RandomBlasting.java | 14 ++--
.../mahout/cf/taste/impl/common/Cache.java | 2 +-
.../cf/taste/impl/recommender/NullRescorer.java | 2 +-
.../mahout/classifier/df/split/OptIgSplit.java | 2 +-
.../classifier/df/tools/TreeVisualizer.java | 2 +-
.../naivebayes/training/TrainNaiveBayesJob.java | 2 +-
.../classify/ClusterClassificationDriver.java | 2 +-
.../classify/ClusterClassificationMapper.java | 2 +-
.../postprocessor/ClusterCountReader.java | 2 +-
.../org/apache/mahout/common/AbstractJob.java | 2 +-
.../org/apache/mahout/common/HadoopUtil.java | 4 +-
.../common/iterator/SamplingIterable.java | 4 +-
.../taste/hadoop/item/RecommenderJobTest.java | 14 ++--
.../mahout/cf/taste/impl/TasteTestCase.java | 4 +-
.../mahout/cf/taste/impl/common/CacheTest.java | 4 +-
.../cf/taste/impl/common/FastByIDMapTest.java | 16 ++--
.../cf/taste/impl/common/FastMapTest.java | 20 ++---
...lusAnonymousConcurrentUserDataModelTest.java | 10 +--
.../SamplingCandidateItemsStrategyTest.java | 2 +-
.../recommender/svd/ALSWRFactorizerTest.java | 2 +-
.../svd/FilePersistenceStrategyTest.java | 4 +-
.../svd/ParallelSGDFactorizerTest.java | 6 +-
.../training/IndexInstancesMapperTest.java | 2 +-
.../clustering/canopy/TestCanopyCreation.java | 30 +++----
.../clustering/kmeans/TestKmeansClustering.java | 10 +--
.../spectral/TestAffinityMatrixInputJob.java | 6 +-
.../spectral/TestMatrixDiagonalizeJob.java | 6 +-
.../spectral/TestUnitVectorizerJob.java | 2 +-
.../clustering/spectral/TestVectorCache.java | 2 +-
.../TestVectorMatrixMultiplicationJob.java | 2 +-
.../spectral/kmeans/TestEigenSeedGenerator.java | 2 +-
.../clustering/streaming/cluster/DataUtils.java | 2 +-
.../mapreduce/StreamingKMeansTestMR.java | 5 +-
.../postprocessor/ClusterCountReaderTest.java | 2 +-
.../ClusterOutputPostProcessorTest.java | 2 +-
.../mahout/common/DummyOutputCollector.java | 2 +-
.../mahout/common/DummyRecordWriterTest.java | 2 +-
.../common/iterator/TestFixedSizeSampler.java | 2 +-
.../common/iterator/TestSamplingIterator.java | 14 ++--
.../iterator/TestStableFixedSizeSampler.java | 2 +-
.../mahout/ep/EvolutionaryProcessTest.java | 4 +-
.../TestVectorDistanceSimilarityJob.java | 2 +-
.../math/neighborhood/SearchQualityTest.java | 4 +-
.../vectorizer/DictionaryVectorizerTest.java | 2 +-
.../EncodedVectorsFromSequenceFilesTest.java | 2 +-
89 files changed, 384 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
[3/3] mahout git commit: MAHOUT-1883 closes no PR,
adds dataset filtering for minimal needed to do cross-occurrence
Posted by pa...@apache.org.
MAHOUT-1883 closes no PR, adds dataset filtering for minimal needed to do cross-occurrence
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/1f5e36f2
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/1f5e36f2
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/1f5e36f2
Branch: refs/heads/master
Commit: 1f5e36f249aabc68495ec15f64f5ed6754d9f1e2
Parents: c9ee728
Author: pferrel <pa...@occamsmachete.com>
Authored: Tue Oct 11 08:10:31 2016 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Tue Oct 11 08:10:31 2016 -0700
----------------------------------------------------------------------
buildtools/pom.xml | 2 +-
distribution/pom.xml | 2 +-
examples/pom.xml | 2 +-
flink/pom.xml | 2 +-
h2o/pom.xml | 2 +-
hdfs/pom.xml | 2 +-
integration/pom.xml | 2 +-
math-scala/pom.xml | 2 +-
math/pom.xml | 2 +-
mr/pom.xml | 2 +-
pom.xml | 2 +-
spark-shell/pom.xml | 2 +-
spark/pom.xml | 2 +-
.../indexeddataset/IndexedDatasetSpark.scala | 38 +++++++++-----------
.../mahout/cf/SimilarityAnalysisSuite.scala | 20 +++++++++++
15 files changed, 50 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/buildtools/pom.xml
----------------------------------------------------------------------
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 136d13c..c96b3a5 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -29,7 +29,7 @@
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-buildtools</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<name>Mahout Build Tools</name>
<packaging>jar</packaging>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 46bfedf..536c76f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>apache-mahout-distribution</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index a7838b7..b3bf827 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 8a6ae55..7857210 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/h2o/pom.xml
----------------------------------------------------------------------
diff --git a/h2o/pom.xml b/h2o/pom.xml
index 1ad7779..f5095bb 100644
--- a/h2o/pom.xml
+++ b/h2o/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hdfs/pom.xml b/hdfs/pom.xml
index 64e1b32..50fe3b7 100644
--- a/hdfs/pom.xml
+++ b/hdfs/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index ffa1adc..d9945d9 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index f40bdc8..9eb7e80 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/math/pom.xml
----------------------------------------------------------------------
diff --git a/math/pom.xml b/math/pom.xml
index 2f5752d..f0ddff1 100644
--- a/math/pom.xml
+++ b/math/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/mr/pom.xml
----------------------------------------------------------------------
diff --git a/mr/pom.xml b/mr/pom.xml
index 00119c9..d79c008 100644
--- a/mr/pom.xml
+++ b/mr/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0120a8a..9af14ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Mahout</name>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark-shell/pom.xml
----------------------------------------------------------------------
diff --git a/spark-shell/pom.xml b/spark-shell/pom.xml
index 878e70d..732c39b 100644
--- a/spark-shell/pom.xml
+++ b/spark-shell/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 94a73b3..5fc9863 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout</artifactId>
- <version>0.12.3-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index e7111a8..0249d9b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -30,8 +30,8 @@ import org.apache.spark.SparkContext._
/**
* Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
* dfsWrite method
- *
- * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+ *
+ * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
* @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
* @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
*/
@@ -54,8 +54,9 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio
new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
}
- /** Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
- * replace the writer to change how it is written.
+ /**
+ * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+ * replace the writer to change how it is written.
*/
override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
(implicit sc: DistributedContext):
@@ -65,16 +66,21 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio
}
}
-/** This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
- * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another
- * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have
- * the same rows and there is some record of which rows are important.
+/**
+ * This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+ * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another
+ * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have
+ * the same rows and there is some record of which rows are important.
*/
object IndexedDatasetSpark {
def apply(elements: RDD[(String, String)], existingRowIDs: Option[BiDictionary] = None)(implicit sc: SparkContext) = {
+ // todo: a further optimization is to return any broadcast dictionaries so they can be passed in and
+ // do not get broadcast again. At present there may be duplicate broadcasts.
- // broadcast the existing dictionary or create a new one, also filter by the existing dictionary or use all elements
+ // create separate collections of rowID and columnID tokens
+ // use the dictionary passed in or create one from the element ids
+ // broadcast the correct row id BiDictionary
val (filteredElements, rowIDDictionary_bcast, rowIDDictionary) = if (existingRowIDs.isEmpty) {
val newRowIDDictionary = new BiDictionary(elements.map { case (rowID, _) => rowID }.distinct().collect())
val newRowIDDictionary_bcast = sc.broadcast(newRowIDDictionary)
@@ -87,20 +93,10 @@ object IndexedDatasetSpark {
(elementsRDD, existingRowIDDictionary_bcast, existingRowIDs.get)
}
- // create separate collections of rowID and columnID tokens
- // use the dictionary passed in or create one from the element ids
- // val rowIDs = filteredElements.map { case (rowID, _) => rowID }.distinct().collect()
+ // column ids are always taken from the RDD passed in
+ // todo: an optimization it to pass in a dictionary or column ids if it is the same as an existing one
val columnIDs = filteredElements.map { case (_, columnID) => columnID }.distinct().collect()
- // create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID
- // broadcast them for access in distributed processes, so they are not recalculated in every task.
- //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
- //val rowIDDictionary = if (existingRowIDs.isEmpty)
- // case Some(d) => d
- // case None => new BiDictionary(filteredElements.map { case (rowID, _) => rowID }.distinct().collect())
- // }
- //val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
-
val columnIDDictionary = new BiDictionary(keys = columnIDs)
val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary)
http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
index 945b443..2d74f7d 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -301,6 +301,26 @@ class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with Distributed
assert(aID.matrix.nrow == 4)
assert(bID.matrix.nrow == 4)
+ assert(aID.rowIDs.contains("u1") &&
+ aID.rowIDs.contains("u2") &&
+ aID.rowIDs.contains("u3") &&
+ aID.rowIDs.contains("u4"))
+
+ assert(aID.columnIDs.contains("a1") &&
+ aID.columnIDs.contains("a2") &&
+ aID.columnIDs.contains("a3") &&
+ aID.columnIDs.contains("a4") &&
+ aID.columnIDs.contains("a5"))
+
+ assert(bID.rowIDs.contains("u1") &&
+ bID.rowIDs.contains("u2") &&
+ bID.rowIDs.contains("u3") &&
+ bID.rowIDs.contains("u4"))
+ assert(bID.columnIDs.contains("b1") &&
+ bID.columnIDs.contains("b2") &&
+ bID.columnIDs.contains("b3") &&
+ bID.columnIDs.contains("b4"))
+
assert(!bID.rowIDs.contains("u5"))// this row id should be filtered out of the drm and dictionary
assert(!bID.columnIDs.contains("b25"))// this row id should be filtered out of the drm and dictionary