You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:36:59 UTC

[16/32] mahout git commit: MAHOUT-1570: rebased to latest upstream

MAHOUT-1570: rebased to latest upstream


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8de8b798
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8de8b798
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8de8b798

Branch: refs/heads/flink-binding
Commit: 8de8b798f0ffa374c2b18e00b8130ac0a0d8e918
Parents: 08ad113
Author: Alexey Grigorev <al...@gmail.com>
Authored: Wed Jun 24 14:30:58 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:41:53 2015 +0200

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkByteBCast.scala   |  4 +++
 .../mahout/flinkbindings/FlinkEngine.scala      | 19 ++++++++++++++
 .../apache/mahout/flinkbindings/package.scala   | 26 ++++++++++++--------
 .../flinkbindings/DistributedFlinkSuit.scala    | 10 +++++---
 4 files changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 70d0545..1024452 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -55,6 +55,10 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri
 
   override def value: T = _value
 
+  override def close: Unit = {
+    // nothing to close
+  }
+
 }
 
 object FlinkByteBCast {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 18e17db..5039d21 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -235,4 +235,23 @@ object FlinkEngine extends DistributedEngine {
   /** Creates empty DRM with non-trivial height */
   override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
                                       (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
+  
+
+  /**
+   * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
+   * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
+   */
+  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): 
+          (DrmLike[Int], Option[DrmLike[K]]) = ???
+
+  /**
+   * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+   */
+  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ???
+
+  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
+
+  /** Optional engine-specific all reduce tensor operation. */
+  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = ???
+ 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index e46e605..955d8b1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,24 +18,30 @@
  */
 package org.apache.mahout
 
+import scala.Array.canBuildFrom
 import scala.reflect.ClassTag
-import org.slf4j.LoggerFactory
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.math.Vector
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.math._
 import org.apache.mahout.math.DenseVector
 import org.apache.mahout.math.Matrix
 import org.apache.mahout.math.MatrixWritable
+import org.apache.mahout.math.Vector
 import org.apache.mahout.math.VectorWritable
 import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.drm.DrmTuple
+import org.slf4j.LoggerFactory
 
 package object flinkbindings {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
index 2295c26..126a8f4 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
@@ -18,19 +18,21 @@
  */
 package org.apache.mahout.flinkbindings
 
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.math.drm.DistributedContext
 import org.apache.mahout.test.DistributedMahoutSuite
 import org.scalatest.Suite
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.flink.api.java.ExecutionEnvironment
+
 
 trait DistributedFlinkSuit extends DistributedMahoutSuite { this: Suite =>
 
   protected implicit var mahoutCtx: DistributedContext = _
   protected var env: ExecutionEnvironment = null
-  
+
   def initContext() {
     env = ExecutionEnvironment.getExecutionEnvironment
-    mahoutCtx = env
+    mahoutCtx = wrapContext(env)
   }
 
   override def beforeEach() {