You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:36:59 UTC
[16/32] mahout git commit: MAHOUT-1570: rebased to latest upstream
MAHOUT-1570: rebased to latest upstream
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8de8b798
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8de8b798
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8de8b798
Branch: refs/heads/flink-binding
Commit: 8de8b798f0ffa374c2b18e00b8130ac0a0d8e918
Parents: 08ad113
Author: Alexey Grigorev <al...@gmail.com>
Authored: Wed Jun 24 14:30:58 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:41:53 2015 +0200
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkByteBCast.scala | 4 +++
.../mahout/flinkbindings/FlinkEngine.scala | 19 ++++++++++++++
.../apache/mahout/flinkbindings/package.scala | 26 ++++++++++++--------
.../flinkbindings/DistributedFlinkSuit.scala | 10 +++++---
4 files changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 70d0545..1024452 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -55,6 +55,10 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri
override def value: T = _value
+ override def close: Unit = {
+ // nothing to close
+ }
+
}
object FlinkByteBCast {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 18e17db..5039d21 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -235,4 +235,23 @@ object FlinkEngine extends DistributedEngine {
/** Creates empty DRM with non-trivial height */
override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
(implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
+
+
+ /**
+ * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
+ * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
+ */
+ def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false):
+ (DrmLike[Int], Option[DrmLike[K]]) = ???
+
+ /**
+ * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+ */
+ def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ???
+
+ def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
+
+ /** Optional engine-specific all reduce tensor operation. */
+ def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = ???
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index e46e605..955d8b1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,24 +18,30 @@
*/
package org.apache.mahout
+import scala.Array.canBuildFrom
import scala.reflect.ClassTag
-import org.slf4j.LoggerFactory
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.math.Vector
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.flinkbindings.FlinkDistributedContext
+import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.math._
import org.apache.mahout.math.DenseVector
import org.apache.mahout.math.Matrix
import org.apache.mahout.math.MatrixWritable
+import org.apache.mahout.math.Vector
import org.apache.mahout.math.VectorWritable
import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.drm.DrmTuple
+import org.slf4j.LoggerFactory
package object flinkbindings {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
index 2295c26..126a8f4 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala
@@ -18,19 +18,21 @@
*/
package org.apache.mahout.flinkbindings
+import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.math.drm.DistributedContext
import org.apache.mahout.test.DistributedMahoutSuite
import org.scalatest.Suite
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.flink.api.java.ExecutionEnvironment
+
trait DistributedFlinkSuit extends DistributedMahoutSuite { this: Suite =>
protected implicit var mahoutCtx: DistributedContext = _
protected var env: ExecutionEnvironment = null
-
+
def initContext() {
env = ExecutionEnvironment.getExecutionEnvironment
- mahoutCtx = env
+ mahoutCtx = wrapContext(env)
}
override def beforeEach() {