You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/15 19:00:18 UTC
[2/2] mahout git commit: Major fixes for Flink backend merged
Major fixes for Flink backend merged
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/072289a4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/072289a4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/072289a4
Branch: refs/heads/flink-binding
Commit: 072289a46c9bd4b7297a17b621f7da30b94df1a7
Parents: 92a2f6c
Author: smarthi <sm...@apache.org>
Authored: Tue Mar 15 13:57:35 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Mar 15 13:57:35 2016 -0400
----------------------------------------------------------------------
flink/pom.xml | 20 +-
.../mahout/flinkbindings/FlinkByteBCast.scala | 3 +
.../flinkbindings/FlinkDistributedContext.scala | 1 +
.../mahout/flinkbindings/FlinkEngine.scala | 325 ++++++++++++-------
.../mahout/flinkbindings/blas/FlinkOpAewB.scala | 58 ++--
.../flinkbindings/blas/FlinkOpAewScalar.scala | 33 +-
.../mahout/flinkbindings/blas/FlinkOpAt.scala | 58 ++--
.../mahout/flinkbindings/blas/FlinkOpAtA.scala | 48 ++-
.../mahout/flinkbindings/blas/FlinkOpAtB.scala | 25 +-
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 4 +-
.../flinkbindings/blas/FlinkOpCBind.scala | 136 ++++----
.../flinkbindings/blas/FlinkOpMapBlock.scala | 34 +-
.../flinkbindings/blas/FlinkOpRBind.scala | 5 +-
.../flinkbindings/blas/FlinkOpRowRange.scala | 20 +-
.../blas/FlinkOpTimesRightMatrix.scala | 54 ++-
.../mahout/flinkbindings/blas/package.scala | 60 ----
.../drm/CheckpointedFlinkDrm.scala | 92 +++++-
.../drm/CheckpointedFlinkDrmOps.scala | 1 -
.../mahout/flinkbindings/drm/FlinkDrm.scala | 58 ++--
.../mahout/flinkbindings/io/DrmMetadata.scala | 16 +-
.../flinkbindings/io/HDFSPathSearch.scala | 6 +-
.../flinkbindings/io/Hadoop1HDFSUtil.scala | 86 -----
.../flinkbindings/io/Hadoop2HDFSUtil.scala | 94 ++++++
.../apache/mahout/flinkbindings/package.scala | 22 +-
.../flinkbindings/DistributedFlinkSuite.scala | 2 +
.../mahout/flinkbindings/DrmLikeOpsSuite.scala | 1 -
.../flinkbindings/FailingTestsSuite.scala | 272 ++++++++++++++++
.../flinkbindings/FlinkByteBCastSuite.scala | 9 +-
.../mahout/flinkbindings/RLikeOpsSuite.scala | 1 -
.../mahout/flinkbindings/UseCasesSuite.scala | 1 -
.../mahout/flinkbindings/blas/LATestSuite.scala | 3 +-
.../DistributedDecompositionsSuite.scala | 1 -
.../standard/DrmLikeOpsSuite.scala | 1 -
.../flinkbindings/standard/DrmLikeSuite.scala | 1 -
.../standard/NaiveBayesTestSuite.scala | 11 +
.../standard/RLikeDrmOpsSuite.scala | 8 -
math-scala/pom.xml | 2 +-
.../math/drm/logical/OpTimesLeftMatrix.scala | 2 +-
.../org/apache/mahout/math/drm/package.scala | 4 +-
.../apache/mahout/math/scalabindings/MMul.scala | 45 ++-
.../DistributedDecompositionsSuiteBase.scala | 4 +-
.../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 3 +
.../mahout/drivers/MahoutSparkDriver.scala | 4 +-
.../drivers/MahoutSparkOptionParser.scala | 6 +-
.../mahout/drivers/RowSimilarityDriver.scala | 5 +-
.../drivers/TextDelimitedReaderWriter.scala | 10 +-
.../apache/mahout/drivers/TrainNBDriver.scala | 21 +-
.../sparkbindings/SparkDistributedContext.scala | 2 +-
.../apache/mahout/sparkbindings/blas/ABt.scala | 2 +-
.../test/DistributedSparkSuite.scala | 4 +-
50 files changed, 1023 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 37f1dbf..2ccb558 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -103,14 +103,15 @@
</build>
<dependencies>
+
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
+ <artifactId>flink-runtime_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.10</artifactId>
+ <artifactId>flink-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@@ -120,15 +121,28 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.10</artifactId>
+ <artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math-scala_${scala.compat.version}</artifactId>
</dependency>
+ <!-- enforce current version of kryo as of 0.10.1-->
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.24.0</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-hdfs</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 8544db0..5cdfb79 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -43,14 +43,17 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri
if (streamType == FlinkByteBCast.StreamTypeVector) {
val writeable = new VectorWritable()
writeable.readFields(stream)
+ // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
writeable.get.asInstanceOf[T]
} else if (streamType == FlinkByteBCast.StreamTypeMatrix) {
val writeable = new MatrixWritable()
writeable.readFields(stream)
+ // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
writeable.get.asInstanceOf[T]
} else {
throw new IllegalArgumentException(s"unexpected type tag $streamType")
}
+
}
override def value: T = _value
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index c818030..cfc9209 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -26,6 +26,7 @@ class FlinkDistributedContext(val env: ExecutionEnvironment) extends Distributed
val engine: DistributedEngine = FlinkEngine
+
override def close() {
// TODO
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index d03aef7..f848c3f 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -18,32 +18,29 @@
*/
package org.apache.mahout.flinkbindings
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.hadoop.io.Writable
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils.DataSetUtils
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
import org.apache.mahout.flinkbindings.blas._
import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.flinkbindings.io.HDFSUtil
-import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil
+import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil}
import org.apache.mahout.math._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.indexeddataset.BiDictionary
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.math.indexeddataset.Schema
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema}
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
-import org.apache.flink.api.scala._
-
+import scala.collection.JavaConversions._
+import scala.reflect._
object FlinkEngine extends DistributedEngine {
- // By default, use Hadoop 1 utils
- var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+ // By default, use Hadoop 2 utils
+ var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
/**
* Load DRM from hdfs (as in Mahout DRM format).
@@ -51,7 +48,7 @@ object FlinkEngine extends DistributedEngine {
* @param path The DFS path to load from
* @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
*/
- override def drmDfsRead(path: String, parMin: Int = 0)
+ override def drmDfsRead(path: String, parMin: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[_] = {
// Require that context is actually Flink context.
@@ -60,19 +57,47 @@ object FlinkEngine extends DistributedEngine {
// Extract the Flink Environment variable
implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
- val metadata = hdfsUtils.readDrmHeader(path)
+ // set the parallelism of the env to parMin
+ env.setParallelism(parMin)
- val unwrapKey = metadata.unwrapKeyFunction
+ // get the header of a SequenceFile in the path
+ val metadata = hdfsUtils.readDrmHeader(path + "//")
- val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
+ val keyClass: Class[_] = metadata.keyTypeWritable
- val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] {
- def map(tuple: (Writable, VectorWritable)): (Any, Vector) = {
- (unwrapKey(tuple._1), tuple._2)
- }
- })
+ // from the header determine which function to use to unwrap the key
+ val unwrapKey = metadata.unwrapKeyFunction
+
+ // Map to the correct DrmLike based on the metadata information
+ if (metadata.keyClassTag == ClassTag.Int) {
+ val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path)
+
+ val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] {
+ def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = {
+ (unwrapKey(tuple._1), tuple._2.get())
+ }
+ })
+ datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+ } else if (metadata.keyClassTag == ClassTag.Long) {
+ val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path)
+
+ val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] {
+ def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = {
+ (unwrapKey(tuple._1), tuple._2.get())
+ }
+ })
+ datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+ } else if (metadata.keyClassTag == ClassTag(classOf[String])) {
+ val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path)
+
+ val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] {
+ def map(tuple: (Text, VectorWritable)): (Any, Vector) = {
+ (unwrapKey(tuple._1), tuple._2.get())
+ }
+ })
+ datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+ } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}")
- datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]])
}
override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
@@ -82,89 +107,114 @@ object FlinkEngine extends DistributedEngine {
(implicit sc: DistributedContext): IndexedDataset = ???
+ /**
+ * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+ *
+ * A particular physical engine implementation may choose to either use or not use these rewrites
+ * as a useful basic rewriting rule.<P>
+ */
+ override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
/**
* Translates logical plan into Flink execution plan.
**/
override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
// Flink-specific Physical Plan translation.
+
+ implicit val typeInformation = generateTypeInformation[K]
val drm = flinkTranslate(plan)
val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
+ // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
newcp.cache()
}
- private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match {
- case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA))
- case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA))
- case op @ OpAtx(a, x) =>
- // express Atx as (A.t) %*% x
- // TODO: create specific implementation of Atx, see MAHOUT-1749
- val opAt = OpAt(a)
- val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
- val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
- val opAx = OpAx(atCast, x)
- FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA))
- case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA),
- flinkTranslate(b)(op.classTagA))
- case op @ OpABt(a, b) =>
- // express ABt via AtB: let C=At and D=Bt, and calculate CtD
- // TODO: create specific implementation of ABt, see MAHOUT-1750
- val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
- val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
- val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol)
-
- val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
- val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
- val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol)
-
- FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
- .asInstanceOf[FlinkDrm[K]]
- case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA))
- case op @ OpTimesRightMatrix(a, b) =>
- FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b)
- case op @ OpAewUnaryFunc(a, _, _) =>
- FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
- case op @ OpAewUnaryFuncFusion(a, _) =>
- FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
- // deprecated
- case op @ OpAewScalar(a, scalar, _) =>
- FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar)
- case op @ OpAewB(a, b, _) =>
- FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
- case op @ OpCbind(a, b) =>
- FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
- case op @ OpRbind(a, b) =>
- FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA))
- case op @ OpCbindScalar(a, x, _) =>
- FlinkOpCBind.cbindScalar(op, flinkTranslate(a)(op.classTagA), x)
- case op @ OpRowRange(a, _) =>
- FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA))
- case op @ OpABAnyKey(a, b) if extractRealClassTag(a) != extractRealClassTag(b) =>
- throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
- case op: OpMapBlock[K, _] =>
- FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf)
- case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol)
- case _ => throw new NotImplementedError(s"operator $oper is not implemented yet")
+ private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
+ implicit val kTag = oper.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+ oper match {
+ case OpAtAnyKey(_) ⇒
+ throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+ case op@OpAx(a, x) ⇒
+ //implicit val typeInformation = generateTypeInformation[K]
+ FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
+ case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+ case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
+ // express Atx as (A.t) %*% x
+ // TODO: create specific implementation of Atx, see MAHOUT-1749
+ val opAt = OpAt(a)
+ val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
+ val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+ val opAx = OpAx(atCast, x)
+ FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
+ case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
+ flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
+ case op@OpABt(a, b) ⇒
+ // express ABt via AtB: let C=At and D=Bt, and calculate CtD
+ // TODO: create specific implementation of ABt, see MAHOUT-1750
+ val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+ val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+ val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+ val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
+ val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+ val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
+ FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
+ case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+ case op@OpTimesRightMatrix(a, b) ⇒
+ FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b)
+ case op@OpAewUnaryFunc(a, _, _) ⇒
+ FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+ case op@OpAewUnaryFuncFusion(a, _) ⇒
+ FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
+ // deprecated
+ case op@OpAewScalar(a, scalar, _) ⇒
+ FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar)
+ case op@OpAewB(a, b, _) ⇒
+ FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), flinkTranslate(b))
+ case op@OpCbind(a, b) ⇒
+ FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b))
+ case op@OpRbind(a, b) ⇒
+ FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b))
+ case op@OpCbindScalar(a, x, _) ⇒
+ FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x)
+ case op@OpRowRange(a, _) ⇒
+ FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+ case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag ⇒
+ throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them")
+ case op: OpMapBlock[K, _] ⇒
+ FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]]
+ case cp: CheckpointedFlinkDrm[K] ⇒
+ //implicit val ktag=cp.keyClassTag
+ new RowsFlinkDrm[K](cp.ds, cp.ncol)
+ case _ ⇒
+ throw new NotImplementedError(s"operator $oper is not implemented yet")
+ }
}
/**
* returns a vector that contains a column-wise sum from DRM
*/
- override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
- val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] {
- def map(tuple: (K, Vector)): Vector = tuple._2
- }).reduce(new ReduceFunction[Vector] {
- def reduce(v1: Vector, v2: Vector) = v1 + v2
- })
+ override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+
+
+ val sum = drm.ds.map {
+ tuple => tuple._2
+ }.reduce(_ + _)
val list = sum.collect
list.head
}
/** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
- override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
- val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] {
- def map(tuple: (Array[K], Matrix)): Vector = {
- val (_, block) = tuple
+ override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+
+
+ val result = drm.asBlockified.ds.map {
+ tuple =>
+ val block = tuple._2
val acc = block(0, ::).like()
block.foreach { v =>
@@ -172,10 +222,7 @@ object FlinkEngine extends DistributedEngine {
}
acc
- }
- }).reduce(new ReduceFunction[Vector] {
- def reduce(v1: Vector, v2: Vector) = v1 + v2
- })
+ }.reduce(_ + _)
val list = result.collect
list.head
@@ -184,21 +231,30 @@ object FlinkEngine extends DistributedEngine {
/**
* returns a vector that contains a column-wise mean from DRM
*/
- override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
drm.colSums() / drm.nrow
}
/**
* Calculates the element-wise squared norm of a matrix
*/
- override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = {
- val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
- def map(tuple: (K, Vector)): Double = tuple match {
+ override def norm[K](drm: CheckpointedDrm[K]): Double = {
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+
+ val sumOfSquares = drm.ds.map {
+ tuple => tuple match {
case (idx, vec) => vec dot vec
}
- }).reduce(new ReduceFunction[Double] {
- def reduce(v1: Double, v2: Double) = v1 + v2
- })
+ }.reduce(_ + _)
+
+// val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+// def map(tuple: (K, Vector)): Double = tuple match {
+// case (idx, vec) => vec dot vec
+// }
+// }).reduce(new ReduceFunction[Double] {
+// def reduce(v1: Double, v2: Double) = v1 + v2
+// })
val list = sumOfSquares.collect
list.head
@@ -214,7 +270,7 @@ object FlinkEngine extends DistributedEngine {
FlinkByteBCast.wrap(m)
- /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[Int] = {
@@ -223,14 +279,19 @@ object FlinkEngine extends DistributedEngine {
new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
}
+
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
- val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+ val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
val dataSetType = TypeExtractor.getForObject(rows.head)
- dc.env.fromCollection(rows).setParallelism(parallelismDegree)
+ //TODO: Make Sure that this is the correct partitioning scheme
+ dc.env.fromCollection(rows)
+ .partitionByRange(0)
+ .setParallelism(parallelismDegree)
+ .rebalance()
}
- /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[String] = {
???
@@ -247,7 +308,7 @@ object FlinkEngine extends DistributedEngine {
for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
}
val result = dc.env.fromCollection(nonParallelResult)
- new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
+ new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol)
}
/** Creates empty DRM with non-trivial height */
@@ -259,29 +320,53 @@ object FlinkEngine extends DistributedEngine {
* Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
* to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
*/
- def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false):
+ def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false):
(DrmLike[Int], Option[DrmLike[K]]) = ???
/**
- * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+ * (Optional) Sampling operation.
*/
- def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ???
-
- def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ???
-
-// def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
-//
-// val ncol = drmX match {
-// case cp: CheckpointedFlinkDrm[K] ⇒ cp.ncol
-// case _ ⇒ -1
-// }
-//
-// val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples)
-//
-// }
+ def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = {
+ implicit val kTag: ClassTag[K] = drmX.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+
+ val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction)
+ new CheckpointedFlinkDrm[K](sample)
+ }
+
+ def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = {
+ implicit val kTag: ClassTag[K] = drmX.keyClassTag
+ implicit val typeInformation = generateTypeInformation[K]
+
+ val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples)
+ new CheckpointedFlinkDrm[K](sample)
+ }
/** Optional engine-specific all reduce tensor operation. */
- def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix =
+ def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix =
throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink")
-
+
+// private def generateTypeInformation[K]: TypeInformation[K] = {
+// val tag = implicitly[K].asInstanceOf[ClassTag[K]]
+// generateTypeInformationFromTag(tag)
+// }
+ private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
+ val tag = implicitly[ClassTag[K]]
+
+ generateTypeInformationFromTag(tag)
+ }
+
+ private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
+ if (tag.runtimeClass.equals(classOf[Int])) {
+ createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
+ } else if (tag.runtimeClass.equals(classOf[Long])) {
+ createTypeInformation[Long].asInstanceOf[TypeInformation[K]]
+ } else if (tag.runtimeClass.equals(classOf[String])) {
+ createTypeInformation[String].asInstanceOf[TypeInformation[K]]
+// } else if (tag.runtimeClass.equals(classOf[Any])) {
+// createTypeInformation[Any].asInstanceOf[TypeInformation[K]]
+ } else {
+ throw new IllegalArgumentException(s"index type $tag is not supported")
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index f879e86..c61074b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -1,56 +1,42 @@
package org.apache.mahout.flinkbindings.blas
-import java.lang.Iterable
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
import org.apache.mahout.math.Vector
import org.apache.mahout.math.drm.logical.OpAewB
import org.apache.mahout.math.scalabindings.RLikeOps._
-import com.google.common.collect.Lists
-
/**
* Implementation is inspired by Spark-binding's OpAewB
* (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala)
*/
object FlinkOpAewB {
- def rowWiseJoinNoSideEffect[K: ClassTag](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+ def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
val function = AewBOpsCloning.strToFunction(op.op)
- val classTag = extractRealClassTag(op.A)
- val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
-
- val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
- val res: DataSet[(Any, Vector)] =
- rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
- .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
- def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)],
- out: Collector[(_, Vector)]): Unit = {
- val it1 = Lists.newArrayList(it1java).asScala
- val it2 = Lists.newArrayList(it2java).asScala
-
- if (it1.nonEmpty && it2.nonEmpty) {
- val (idx, a) = it1.head
- val (_, b) = it2.head
- out.collect((idx, function(a, b)))
- } else if (it1.isEmpty && it2.nonEmpty) {
- out.collect(it2.head)
- } else if (it1.nonEmpty && it2.isEmpty) {
- out.collect(it1.head)
- }
+ val rowsA = A.asRowWise.ds
+ val rowsB = B.asRowWise.ds
+ implicit val kTag = op.keyClassTag
+
+ val res: DataSet[(K, Vector)] =
+ rowsA
+ .coGroup(rowsB)
+ .where(0)
+ .equalTo(0) {
+ (left, right, out: Collector[(K, Vector)]) =>
+ (left.toIterable.headOption, right.toIterable.headOption) match {
+ case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b)))
+ case (None, Some(b)) => out.collect(b)
+ case (Some(a), None) => out.collect(a)
+ case (None, None) => throw new RuntimeException("At least one side of the co group " +
+ "must be non-empty.")
+ }
}
- })
+
new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index 67d710b..56e7deb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -19,6 +19,7 @@
package org.apache.mahout.flinkbindings.blas
import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
import org.apache.mahout.math.Matrix
import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc}
@@ -39,43 +40,45 @@ object FlinkOpAewScalar {
private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
@Deprecated
- def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
+ def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
val function = EWOpsCloning.strToFunction(op.op)
+ implicit val kTag = op.keyClassTag
- val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
- def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
- case (keys, mat) => (keys, function(mat, scalar))
- }
- })
+
+ val res = A.asBlockified.ds.map{
+ tuple => (tuple._1, function(tuple._2, scalar))
+ }
new BlockifiedFlinkDrm(res, op.ncol)
}
- def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
+ def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
val f = op.f
val inplace = isInplace
+
+ implicit val kTag = op.keyClassTag
+
val res = if (op.evalZeros) {
- A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
- def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+ A.asBlockified.ds.map{
+ tuple =>
val (keys, block) = tuple
val newBlock = if (inplace) block else block.cloned
newBlock := ((_, _, x) => f(x))
(keys, newBlock)
- }
- })
+ }
} else {
- A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
- def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+ A.asBlockified.ds.map{
+ tuple =>
val (keys, block) = tuple
val newBlock = if (inplace) block else block.cloned
for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
(keys, newBlock)
- }
- })
+ }
}
new BlockifiedFlinkDrm(res, op.ncol)
+
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index e515b34..6e320af 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -18,25 +18,13 @@
*/
package org.apache.mahout.flinkbindings.blas
-import java.lang.Iterable
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConverters.asScalaBufferConverter
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.shaded.com.google.common.collect.Lists
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SequentialAccessSparseVector
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.DrmTuple
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
+import org.apache.mahout.math.{SequentialAccessSparseVector, Vector}
import org.apache.mahout.math.drm.logical.OpAt
import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.flink.api.scala._
+import scala.Array.canBuildFrom
/**
* Implementation is taken from Spark's At
@@ -52,34 +40,34 @@ object FlinkOpAt {
def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
- val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] {
- def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match {
- case (keys, block) =>
- (0 until block.ncol).map(columnIdx => {
+ val sparseParts = A.asBlockified.ds.flatMap {
+ blockifiedTuple =>
+ val keys = blockifiedTuple._1
+ val block = blockifiedTuple._2
+
+ (0 until block.ncol).map {
+ columnIndex =>
val columnVector: Vector = new SequentialAccessSparseVector(ncol)
- keys.zipWithIndex.foreach { case (key, idx) =>
- columnVector(key) = block(idx, columnIdx)
+ keys.zipWithIndex.foreach {
+ case (key, idx) => columnVector(key) = block(idx, columnIndex)
}
- out.collect((columnIdx, columnVector))
- })
- }
- })
+ (columnIndex, columnVector)
+ }
+ }
- val regrouped = sparseParts.groupBy(selector[Vector, Int])
+ val regrouped = sparseParts.groupBy(0)
- val sparseTotal = regrouped.reduceGroup(new GroupReduceFunction[(Int, Vector), DrmTuple[Int]] {
- def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = {
- val it = Lists.newArrayList(values).asScala
- val (idx, _) = it.head
- val vector = (it map { case (idx, vec) => vec }).sum
- out.collect((idx, vector))
- }
- })
+ val sparseTotal = regrouped.reduce{
+ (left, right) =>
+ (left._1, left._2 + right._2)
+ }
// TODO: densify or not?
new RowsFlinkDrm(sparseTotal, ncol)
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 629857a..bdb0e5e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.flinkbindings.blas
import java.lang.Iterable
-import scala.collection.JavaConverters._
-
import org.apache.flink.api.common.functions._
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.com.google.common.collect.Lists
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm._
import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import scala.collection.JavaConverters._
/**
* Inspired by Spark's implementation from
@@ -29,61 +27,59 @@ object FlinkOpAtA {
final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200"
- def at_a(op: OpAtA[_], A: FlinkDrm[_]): FlinkDrm[Int] = {
+ def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, PROPERTY_ATA_MAXINMEMNCOL_DEFAULT)
val maxInMemNCol = maxInMemStr.toInt
maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
+ implicit val kTag = A.classTag
+
if (op.ncol <= maxInMemNCol) {
implicit val ctx = A.context
val inCoreAtA = slim(op, A)
val result = drmParallelize(inCoreAtA, numPartitions = 1)
result
} else {
- fat(op.asInstanceOf[OpAtA[Any]], A.asInstanceOf[FlinkDrm[Any]])
+ fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]])
}
}
- def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = {
- val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]]
+ def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
+ val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]]
- val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] {
+ val res = ds.map {
// TODO: optimize it: use upper-triangle matrices like in Spark
- def map(block: (Array[Any], Matrix)): Matrix = block match {
- case (idx, m) => m.t %*% m
- }
- }).reduce(new ReduceFunction[Matrix] {
- def reduce(m1: Matrix, m2: Matrix) = m1 + m2
- }).collect()
+ block => block._2.t %*% block._2
+ }.reduce(_ + _).collect()
res.head
}
- def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
+ def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
val nrow = op.A.nrow
val ncol = op.A.ncol
val ds = A.asBlockified.ds
- val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] {
- def map(a: (Array[Any], Matrix)): Int = 1
+ val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], Matrix), Int] {
+ def map(a: (Array[K], Matrix)): Int = 1
}).reduce(new ReduceFunction[Int] {
def reduce(a: Int, b: Int): Int = a + b
})
- val subresults: DataSet[(Int, Matrix)] =
- ds.flatMap(new RichFlatMapFunction[(Array[Any], Matrix), (Int, Matrix)] {
+ val subresults: DataSet[(Int, Matrix)] =
+ ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, Matrix)] {
var ranges: Array[Range] = null
override def open(params: Configuration): Unit = {
- val runtime = this.getRuntimeContext()
+ val runtime = this.getRuntimeContext
val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
val parts = dsX.get(0)
val numParts = estimatePartitions(nrow, ncol, parts)
ranges = computeEvenSplits(ncol, numParts)
}
- def flatMap(tuple: (Array[Any], Matrix), out: Collector[(Int, Matrix)]): Unit = {
+ def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): Unit = {
val block = tuple._2
ranges.zipWithIndex.foreach { case (range, idx) =>
@@ -93,13 +89,13 @@ object FlinkOpAtA {
}).withBroadcastSet(numberOfPartitions, "numberOfPartitions")
- val res = subresults.groupBy(selector[Matrix, Int])
+ val res = subresults.groupBy(0)
.reduceGroup(new RichGroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
var ranges: Array[Range] = null
override def open(params: Configuration): Unit = {
- val runtime = this.getRuntimeContext()
+ val runtime = this.getRuntimeContext
val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions")
val parts = dsX.get(0)
val numParts = estimatePartitions(nrow, ncol, parts)
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index b514868..6a081ba 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -46,25 +46,22 @@ import org.apache.flink.api.scala._
*/
object FlinkOpAtB {
- def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = {
- val classTag = extractRealClassTag(op.A)
- val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
+ def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = {
- val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
- val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner)
+ val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+ val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
+ val joined = rowsAt.join(rowsB).where(0).equalTo(0)
val ncol = op.ncol
val nrow = op.nrow.toInt
val blockHeight = 10
val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1)
- val preProduct: DataSet[(Int, Matrix)] =
- joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] {
- def flatMap(in: Tuple2[(_, Vector), (_, Vector)],
- out: Collector[(Int, Matrix)]): Unit = {
+ val preProduct: DataSet[(Int, Matrix)] =
+ joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), (Int, Matrix)] {
+ def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, Matrix)]): Unit = {
val avec = in._1._2
- val bvec = in._1._2
+ val bvec = in._2._2
0.until(blockCount) map { blockKey =>
val blockStart = blockKey * blockHeight
@@ -72,13 +69,13 @@ object FlinkOpAtB {
val outer = avec(blockStart until blockEnd) cross bvec
out.collect(blockKey -> outer)
+ out
}
}
})
val res: BlockifiedDrmDataSet[Int] =
- preProduct.groupBy(selector[Matrix, Int])
- .reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
+ preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = {
val it = Lists.newArrayList(values).asScala
val (idx, _) = it.head
@@ -90,7 +87,7 @@ object FlinkOpAtB {
}
})
- new BlockifiedFlinkDrm(res, ncol)
+ new BlockifiedFlinkDrm[Int](res, ncol)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 4302457..79f5fe8 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -21,6 +21,7 @@ package org.apache.mahout.flinkbindings.blas
import java.util.List
import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
import org.apache.mahout.math.{Matrix, Vector}
@@ -38,8 +39,9 @@ import org.apache.flink.api.scala._
*/
object FlinkOpAx {
- def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
+ def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
implicit val ctx = A.context
+ implicit val kTag = op.keyClassTag
val singletonDataSetX = ctx.env.fromElements(op.x)
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 6cf5e5c..65b2a25 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -19,12 +19,14 @@
package org.apache.mahout.flinkbindings.blas
import java.lang.Iterable
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm._
@@ -43,94 +45,86 @@ import org.apache.mahout.math.scalabindings._
*/
object FlinkOpCBind {
- def cbind[K: ClassTag](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+ def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
val n = op.ncol
val n1 = op.A.ncol
val n2 = op.B.ncol
- val classTag = extractRealClassTag(op.A)
- val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]])
-
- val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
- val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]]
-
- val res: DataSet[(Any, Vector)] =
- rowsA.coGroup(rowsB).where(joiner).equalTo(joiner)
- .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] {
- def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)],
- out: Collector[(_, Vector)]): Unit = {
- val it1 = Lists.newArrayList(it1java).asScala
- val it2 = Lists.newArrayList(it2java).asScala
-
- if (it1.nonEmpty && it2.nonEmpty) {
- val (idx, a) = it1.head
- val (_, b) = it2.head
-
- val result: Vector = if (a.isDense && b.isDense) {
- new DenseVector(n)
- } else {
- new SequentialAccessSparseVector(n)
- }
-
- result(0 until n1) := a
- result(n1 until n) := b
-
- out.collect((idx, result))
- } else if (it1.isEmpty && it2.nonEmpty) {
- val (idx, b) = it2.head
- val result: Vector = if (b.isDense) {
- new DenseVector(n)
- } else {
- new SequentialAccessSparseVector(n)
- }
- result(n1 until n) := b
- out.collect((idx, result))
- } else if (it1.nonEmpty && it2.isEmpty) {
- val (idx, a) = it1.head
- val result: Vector = if (a.isDense) {
- new DenseVector(n)
- } else {
- new SequentialAccessSparseVector(n)
+ implicit val classTag = op.A.keyClassTag
+
+ val rowsA = A.asRowWise.ds
+ val rowsB = B.asRowWise.ds
+
+ val res: DataSet[(K, Vector)] =
+ rowsA.coGroup(rowsB).where(0).equalTo(0) {
+ (left, right) =>
+ (left.toIterable.headOption, right.toIterable.headOption) match {
+ case (Some((idx, a)), Some((_, b))) =>
+ val result = if (a.isDense && b.isDense) {
+ new DenseVector(n)
+ } else {
+ new SequentialAccessSparseVector(n)
+ }
+
+ result(0 until n1) := a
+ result(n1 until n) := b
+
+ (idx, result)
+ case (Some((idx, a)), None) =>
+ val result: Vector = if (a.isDense) {
+ new DenseVector(n)
+ } else {
+ new SequentialAccessSparseVector(n)
+ }
+ result(n1 until n) := a
+
+ (idx, result)
+ case (None, Some((idx, b))) =>
+ val result: Vector = if (b.isDense) {
+ new DenseVector(n)
+ } else {
+ new SequentialAccessSparseVector(n)
+ }
+ result(n1 until n) := b
+
+ (idx, result)
+ case (None, None) =>
+ throw new RuntimeException("CoGroup should have at least one non-empty input.")
}
- result(n1 until n) := a
- out.collect((idx, result))
- }
}
- })
new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
}
- def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
+ def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = {
val left = op.leftBind
val ds = A.asBlockified.ds
- val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
- def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
- case (keys, mat) => (keys, cbind(mat, x, left))
- }
+ implicit val kTag= op.keyClassTag
- def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
- val ncol = mat.ncol
- val newMat = mat.like(mat.nrow, ncol + 1)
+ def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
+ val ncol = mat.ncol
+ val newMat = mat.like(mat.nrow, ncol + 1)
- if (left) {
- newMat.zip(mat).foreach { case (newVec, origVec) =>
- newVec(0) = x
- newVec(1 to ncol) := origVec
- }
- } else {
- newMat.zip(mat).foreach { case (newVec, origVec) =>
- newVec(ncol) = x
- newVec(0 to (ncol - 1)) := origVec
- }
+ if (left) {
+ newMat.zip(mat).foreach { case (newVec, origVec) =>
+ newVec(0) = x
+ newVec(1 to ncol) := origVec
+ }
+ } else {
+ newMat.zip(mat).foreach { case (newVec, origVec) =>
+ newVec(ncol) = x
+ newVec(0 to (ncol - 1)) := origVec
}
-
- newMat
}
- })
+
+ newMat
+ }
+
+ val out = A.asBlockified.ds.map {
+ tuple => (tuple._1, cbind(tuple._2, x, left))
+ }
new BlockifiedFlinkDrm(out, op.ncol)
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
index 9530d43..c3918a5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
@@ -18,13 +18,10 @@
*/
package org.apache.mahout.flinkbindings.blas
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.drm.logical.OpMapBlock
import org.apache.mahout.math.scalabindings.RLikeOps._
/**
@@ -33,16 +30,19 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
*/
object FlinkOpMapBlock {
- def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = {
- val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] {
- def map(block: (Array[S], Matrix)): (Array[R], Matrix) = {
- val out = function(block)
- assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
- assert(out._2.ncol == ncol, s"block map must return $ncol number of columns.")
- out
- }
- })
+ def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = {
+ implicit val rtag = operator.keyClassTag
+ val bmf = operator.bmf
+ val ncol = operator.ncol
+ val res = src.asBlockified.ds.map {
+ block =>
+ val result = bmf(block)
+ assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.")
+ assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.")
+ // printf("Block partition: \n%s\n", block._2)
+ result
+ }
- new BlockifiedFlinkDrm(res, ncol)
+ new BlockifiedFlinkDrm[R](res, ncol)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 83beaa1..4fa2eaa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -18,6 +18,8 @@
*/
package org.apache.mahout.flinkbindings.blas
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
import scala.reflect.ClassTag
import org.apache.flink.api.scala.DataSet
@@ -28,8 +30,9 @@ import org.apache.mahout.math.drm.logical.OpRbind
object FlinkOpRBind {
- def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
+ def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
// note that indexes of B are already re-arranged prior to executing this code
+ implicit val kTag = op.keyClassTag
val res = A.asRowWise.ds.union(B.asRowWise.ds)
new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
index 6e11892..39f4ceb 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
@@ -18,11 +18,9 @@
*/
package org.apache.mahout.flinkbindings.blas
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Vector
import org.apache.mahout.math.drm.logical.OpRowRange
/**
@@ -35,17 +33,13 @@ object FlinkOpRowRange {
val rowRange = op.rowRange
val firstIdx = rowRange.head
- val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] {
- def filter(tuple: (Int, Vector)): Boolean = tuple match {
- case (idx, vec) => rowRange.contains(idx)
- }
- })
+ val filtered = A.asRowWise.ds.filter {
+ tuple => rowRange.contains(tuple._1)
+ }
- val res = filtered.map(new MapFunction[(Int, Vector), (Int, Vector)] {
- def map(tuple: (Int, Vector)): (Int, Vector) = tuple match {
- case (idx, vec) => (idx - firstIdx, vec)
- }
- })
+ val res = filtered.map {
+ tuple => (tuple._1 - firstIdx, tuple._2)
+ }
new RowsFlinkDrm(res, op.ncol)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index 989fad1..4e5b1a7 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -18,17 +18,16 @@
*/
package org.apache.mahout.flinkbindings.blas
-import scala.reflect.ClassTag
-
import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import org.apache.flink.api.scala._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{DenseMatrix, Matrix}
/**
* Implementation is taken from Spark's OpTimesRightMatrix:
@@ -36,20 +35,51 @@ import org.apache.flink.api.scala._
*/
object FlinkOpTimesRightMatrix {
- def drmTimesInCore[K: ClassTag](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
+ def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
implicit val ctx = A.context
+ implicit val kTag = op.keyClassTag
+
+
- val singletonDataSetB = ctx.env.fromElements(inCoreB)
+ /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo
+ * Issue resulkting in a stackOverflow error.
+ *
+ * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end
+ *
+ * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself.
+ */
+
+ // val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+
+ // val inCoreBcastB = FlinkEngine.drmBroadcast(inCoreB)
+ // val singletonDataSetB = ctx.env.fromElements(inCoreB)
+
+ val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
+ val dataSetType = TypeExtractor.getForObject(rows.head)
+ val singletonDataSetB = ctx.env.fromCollection(rows)
val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
var inCoreB: Matrix = null
override def open(params: Configuration): Unit = {
val runtime = this.getRuntimeContext
- val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix")
- inCoreB = dsB.get(0)
- }
+ //val dsB: java.util.List[Matrix]
+ val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = runtime.getBroadcastVariable("matrix")
+ val m = dsB.size()
+ val n = dsB.get(0)._2.size
+ val isDense = dsB.get(0)._2.isDense
+ inCoreB = isDense match {
+ case true => new DenseMatrix(m, n)
+ case false => new DenseMatrix(m, n)
+ }
+ for (i <- 0 until m) {
+ inCoreB(i, ::) := dsB.get(i)._2
+ }
+
+ }
+
override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match {
case (keys, block_A) => (keys, block_A %*% inCoreB)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
deleted file mode 100644
index 27f552c..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import scala.reflect.ClassTag
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-package object blas {
-
- // TODO: remove it once figure out how to make Flink accept interfaces (Vector here)
- def selector[V, K: ClassTag]: KeySelector[(K, V), K] = {
- val tag = implicitly[ClassTag[K]]
- if (tag.runtimeClass.equals(classOf[Int])) {
- tuple_1_int.asInstanceOf[KeySelector[(K, V), K]]
- } else if (tag.runtimeClass.equals(classOf[Long])) {
- tuple_1_long.asInstanceOf[KeySelector[(K, V), K]]
- } else if (tag.runtimeClass.equals(classOf[String])) {
- tuple_1_string.asInstanceOf[KeySelector[(K, V), K]]
- } else {
- throw new IllegalArgumentException(s"index type $tag is not supported")
- }
- }
-
- private def tuple_1_int[K: ClassTag] = new KeySelector[(Int, _), Int]
- with ResultTypeQueryable[Int] {
- def getKey(tuple: Tuple2[Int, _]): Int = tuple._1
- def getProducedType: TypeInformation[Int] = createTypeInformation[Int]
- }
-
- private def tuple_1_long[K: ClassTag] = new KeySelector[(Long, _), Long]
- with ResultTypeQueryable[Long] {
- def getKey(tuple: Tuple2[Long, _]): Long = tuple._1
- def getProducedType: TypeInformation[Long] = createTypeInformation[Long]
- }
-
- private def tuple_1_string[K: ClassTag] = new KeySelector[(String, _), String]
- with ResultTypeQueryable[String] {
- def getKey(tuple: Tuple2[String, _]): String = tuple._1
- def getProducedType: TypeInformation[String] = createTypeInformation[String]
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 96d57d2..84b327a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -19,13 +19,13 @@
package org.apache.mahout.flinkbindings.drm
import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
-import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
import org.apache.mahout.flinkbindings.{DrmDataSet, _}
-import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable}
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm.CacheHint._
import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _}
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
@@ -37,6 +37,7 @@ import scala.util.Random
class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
+ override val cacheHint: CacheHint = CacheHint.NONE,
override protected[mahout] val partitioningTag: Long = Random.nextLong(),
private var _canHaveMissingRows: Boolean = false
) extends CheckpointedDrm[K] {
@@ -79,7 +80,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
- def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
+ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+ this
+ }
def collect: Matrix = {
val data = ds.collect()
@@ -123,21 +127,76 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
def dfsWrite(path: String): Unit = {
val env = ds.getExecutionEnvironment
- val keyTag = implicitly[ClassTag[K]]
- val convertKey = keyToWritableFunc(keyTag)
+ // ds.map is not picking up the correct runtime value of tuple._1
+ // WritableType info is throwing an exception
+ // when asserting that the key is not an actual Writable
+ // rather a subclass
- val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] {
- def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
- case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
- }
- })
+// val keyTag = implicitly[ClassTag[K]]
+// def convertKey = keyToWritableFunc(keyTag)
+// val writableDataset = ds.map {
+// tuple => (convertKey(tuple._1), new VectorWritable(tuple._2))
+// }
+
+
+ // test output with IntWritable Key. VectorWritable is not a problem,
+// val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+// def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+// (new IntWritable(1), new VectorWritable(tuple._2))
+// })
+
+
+ val keyTag = implicitly[ClassTag[K]]
val job = new JobConf
- val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable]
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
- val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job)
- writableDataset.output(hadoopOutput)
+ // explicitly define all Writable Subclasses for ds.map() keys
+ // as well as the SequenceFileOutputFormat paramaters
+ if (keyTag.runtimeClass == classOf[Int]) {
+ // explicitly map into Int keys
+ implicit val typeInformation = createTypeInformation[(IntWritable,VectorWritable)]
+ val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] {
+ def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
+ (new IntWritable(tuple._1.asInstanceOf[Int]), new VectorWritable(tuple._2))
+ })
+
+ // setup sink for IntWritable
+ job.setOutputKeyClass(classOf[IntWritable])
+ job.setOutputValueClass(classOf[VectorWritable])
+ val sequenceFormat = new SequenceFileOutputFormat[IntWritable, VectorWritable]
+ val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job)
+ writableDataset.output(hadoopOutput)
+
+ } else if (keyTag.runtimeClass == classOf[String]) {
+ // explicitly map into Text keys
+ val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, VectorWritable)] {
+ def map(tuple: DrmTuple[K]): (Text, VectorWritable) =
+ (new Text(tuple._1.asInstanceOf[String]), new VectorWritable(tuple._2))
+ })
+
+ // setup sink for Text
+ job.setOutputKeyClass(classOf[Text])
+ job.setOutputValueClass(classOf[VectorWritable])
+ val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable]
+ val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job)
+ writableDataset.output(hadoopOutput)
+
+ } else if (keyTag.runtimeClass == classOf[Long]) {
+ // explicitly map into Long keys
+ val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, VectorWritable)] {
+ def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) =
+ (new LongWritable(tuple._1.asInstanceOf[Long]), new VectorWritable(tuple._2))
+ })
+
+ // setup sink for LongWritable
+ job.setOutputKeyClass(classOf[LongWritable])
+ job.setOutputValueClass(classOf[VectorWritable])
+ val sequenceFormat = new SequenceFileOutputFormat[LongWritable, VectorWritable]
+ val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job)
+ writableDataset.output(hadoopOutput)
+
+ } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
env.execute(s"dfsWrite($path)")
}
@@ -148,9 +207,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
} else if (keyTag.runtimeClass == classOf[String]) {
(x: K) => new Text(x.asInstanceOf[String])
} else if (keyTag.runtimeClass == classOf[Long]) {
- (x: K) => new LongWritable(x.asInstanceOf[Long])
- } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
- (x: K) => x.asInstanceOf[Writable]
+ (x: K) => new LongWritable(x.asInstanceOf[Long])
+ // WritableTypeInfo will reject the base Writable class
+// } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
+// (x: K) => x.asInstanceOf[Writable]
} else {
throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
index a037d44..e65c43d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
@@ -31,5 +31,4 @@ class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
/** Flink matrix customization exposure */
def dataset = flinkDrm.ds
-
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index c9c1b2c..aea62fa 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -18,18 +18,13 @@
*/
package org.apache.mahout.flinkbindings.drm
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext}
-import org.apache.mahout.math.drm.DrmTuple
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
-import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.reflect.ClassTag
trait FlinkDrm[K] {
@@ -43,7 +38,7 @@ trait FlinkDrm[K] {
def classTag: ClassTag[K]
}
-class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
def executionEnvironment = ds.getExecutionEnvironment
def context: FlinkDistributedContext = ds.getExecutionEnvironment
@@ -54,27 +49,27 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
val ncolLocal = ncol
val classTag = implicitly[ClassTag[K]]
- val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], (Array[K], Matrix)] {
- def mapPartition(values: Iterable[DrmTuple[K]], out: Collector[(Array[K], Matrix)]): Unit = {
- val it = values.asScala.seq
+ val parts = ds.mapPartition {
+ values =>
+ val (keys, vectors) = values.toIterable.unzip
- val (keys, vectors) = it.unzip
if (vectors.nonEmpty) {
- val isDense = vectors.head.isDense
-
- if (isDense) {
+ val vector = vectors.head
+ val matrix: Matrix = if (vector.isDense) {
val matrix = new DenseMatrix(vectors.size, ncolLocal)
vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
- out.collect((keys.toArray(classTag), matrix))
+ matrix
} else {
- val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
- out.collect((keys.toArray(classTag), matrix))
+ new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
}
+
+ Seq((keys.toArray(classTag), matrix))
+ } else {
+ Seq()
}
- }
- })
+ }
- new BlockifiedFlinkDrm(parts, ncol)
+ new BlockifiedFlinkDrm[K](parts, ncol)
}
def asRowWise = this
@@ -83,26 +78,31 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl
}
-class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
+
def executionEnvironment = ds.getExecutionEnvironment
def context: FlinkDistributedContext = ds.getExecutionEnvironment
+
def isBlockified = true
def asBlockified = this
def asRowWise = {
- val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
- def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match {
- case (keys, block) => keys.view.zipWithIndex.foreach {
- case (key, idx) =>
- out.collect((key, block(idx, ::)))
+ val out = ds.flatMap {
+ tuple =>
+ val keys = tuple._1
+ val block = tuple._2
+
+ keys.view.zipWithIndex.map {
+ case (key, idx) => (key, block(idx, ::))
}
- }
- })
- new RowsFlinkDrm(out, ncol)
+ }
+
+ new RowsFlinkDrm[K](out, ncol)
}
def classTag = implicitly[ClassTag[K]]
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
index 24f298d..83ede9a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.io._
import java.util.Arrays
/**
- * Copied from /spark/src/main/scala/org/apache/mahout/common
+ * Flink DRM Metadata
*/
class DrmMetadata(
@@ -40,13 +40,13 @@ class DrmMetadata(
* @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
*/
val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match {
- case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
- case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
- case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
- case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
- case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
- case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
- case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+ case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _
+ case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _
+ case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _
+ case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _
+ case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _
+ case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _
+ case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _
case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
index e77143e..b9d9f1b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
@@ -62,17 +62,17 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
val seed = fs.getFileStatus(new Path(dir))
var f: String = files
- if (seed.isDir) {
+ if (seed.isDirectory) {
val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
for (fileStatus <- fileStatuses) {
if (fileStatus.getPath().getName().matches(filePattern)
- && !fileStatus.isDir) {
+ && !fileStatus.isDirectory) {
// found a file
if (fileStatus.getLen() != 0) {
// file is not empty
f = f + fileStatus.getPath.toUri.toString + ","
}
- } else if (fileStatus.isDir && recursive) {
+ } else if (fileStatus.isDirectory && recursive) {
f = findFiles(fileStatus.getPath.toString, filePattern, f)
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 6581721..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import org.apache.hadoop.io.{ Writable, SequenceFile }
-import org.apache.hadoop.fs.{ FileSystem, Path }
-import org.apache.hadoop.conf.Configuration
-import collection._
-import JavaConversions._
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
- * with Hadoop 2.0
- *
- * Copied from /spark/src/main/scala/org/apache/mahout/common
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
- /**
- * Read the header of a sequence file and determine the Key and Value type
- * @param path
- * @return
- */
- def readDrmHeader(path: String): DrmMetadata = {
- val dfsPath = new Path(path)
- val fs = dfsPath.getFileSystem(new Configuration())
-
- val partFilePath: Path = fs.listStatus(dfsPath)
-
- // Filter out anything starting with .
- .filter { s =>
- !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir
- }
-
- // Take path
- .map(_.getPath)
-
- // Take only one, if any
- .headOption
-
- // Require there's at least one partition file found.
- .getOrElse {
- throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
- }
-
- val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
- try {
- new DrmMetadata(
- keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
- valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
- } finally {
- reader.close()
- }
-
- }
-
- /**
- * Delete a path from the filesystem
- * @param path
- */
- def delete(path: String) {
- val dfsPath = new Path(path)
- val fs = dfsPath.getFileSystem(new Configuration())
-
- if (fs.exists(dfsPath)) {
- fs.delete(dfsPath, true)
- }
- }
-
-}