You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by chobeat <gi...@git.apache.org> on 2016/06/23 13:36:54 UTC

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

GitHub user chobeat opened a pull request:

    https://github.com/apache/flink/pull/2152

    [FLINK-3920] Distributed Linear Algebra: block-based matrix

    Second part of the distributed linear algebra contribution. This  PR introduces block-partitioned matrices, operations on them (multiplication, per-row operations) and conversions from and to row-partitioned matrices.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/radicalbit/flink FLINK-3920

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2152
    
----
commit c59125272cc97be21bfe303fe2bb0cdd70c81915
Author: chobeat <si...@gmail.com>
Date:   2016-06-23T12:37:13Z

    BlockMatrix

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76550821
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
    +
    +class Block() {
    +
    +  var blockData: FlinkMatrix = null
    +
    +  def setBlockData(flinkMatrix: FlinkMatrix) = blockData = flinkMatrix
    +
    +  def getBlockData = blockData
    +
    +  def toBreeze = blockData.asBreeze
    +
    +  def getCols = blockData.numCols
    +
    +  def getRows = blockData.numRows
    --- End diff --
    
    get/set naming convention is not suitable for Scala. We need to change `getCols` and `getRows` to `numCols` and `numRows`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551066
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex}
    +
    +/**
    +  * This class is in charge of handling all the spatial logic required by BlockMatrix.
    +  * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates".
    +  * This is a space where blocks are indexed (starting from zero).
    +  *
    +  * So every coordinate in the original space can be mapped to this space and to the
    +  * corrisponding block.  A block have a row and a column coordinate that explicits
    +  * its position. Every set of coordinates in the mapped space corresponds to a square
    +  * of size rowPerBlock x colsPerBlock.
    +  *
    +  */
    +case class BlockMapper( //original matrix size
    +                       numRows: Int,
    +                       numCols: Int,
    +                       //block size
    +                       rowsPerBlock: Int,
    +                       colsPerBlock: Int) {
    +
    +  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
    +  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
    +  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
    +  val numBlocks = numBlockCols * numBlockRows
    +
    +  /**
    +    * Translates absolute coordinates to the mapped coordinates of the block
    +    * these coordinates belong to.
    +    * @param i
    +    * @param j
    +    * @return
    +    */
    +  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) =
    +    getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
    +
    +  /**
    +    * Retrieves a block id from original coordinates
    +    * @param i Original row
    +    * @param j Original column
    +    * @return Block ID
    +    */
    +  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = {
    +
    +    if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
    +      throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
    --- End diff --
    
    Following block with `require` would be better:
    
    ```scala
    require(0 <= i && i < numRows && 0 <= j && j < numCols, s"Invalid coordinates ($i, $j).")
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r68715364
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,287 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import java.lang
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val getDataset = data
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows &&
    +    this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left).getOrElse(
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols)))
    +
    +            val (id2, block2) = Option(right).getOrElse(
    +                (left._1, Block.zero(left._2.getRows, left._2.getCols)))
    +
    +            require(id1 == id2)
    --- End diff --
    
    It's mostly to detect implementation errors but it may be redundant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    Thanks for opening pull request @chobeat! I would like to shepherd this PR. I'll review in weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551191
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    --- End diff --
    
    Please avoid naming convention starts with `get/set`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551085
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex}
    +
    +/**
    +  * This class is in charge of handling all the spatial logic required by BlockMatrix.
    +  * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates".
    +  * This is a space where blocks are indexed (starting from zero).
    +  *
    +  * So every coordinate in the original space can be mapped to this space and to the
    +  * corrisponding block.  A block have a row and a column coordinate that explicits
    +  * its position. Every set of coordinates in the mapped space corresponds to a square
    +  * of size rowPerBlock x colsPerBlock.
    +  *
    +  */
    +case class BlockMapper( //original matrix size
    +                       numRows: Int,
    +                       numCols: Int,
    +                       //block size
    +                       rowsPerBlock: Int,
    +                       colsPerBlock: Int) {
    +
    +  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
    +  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
    +  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
    +  val numBlocks = numBlockCols * numBlockRows
    +
    +  /**
    +    * Translates absolute coordinates to the mapped coordinates of the block
    +    * these coordinates belong to.
    +    * @param i
    +    * @param j
    +    * @return
    +    */
    +  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) =
    +    getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
    +
    +  /**
    +    * Retrieves a block id from original coordinates
    +    * @param i Original row
    +    * @param j Original column
    +    * @return Block ID
    +    */
    +  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = {
    +
    +    if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
    +      throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
    +    } else {
    +      val mappedRow = i / rowsPerBlock
    +      val mappedColumn = j / colsPerBlock
    +      val res = mappedRow * numBlockCols + mappedColumn
    +
    +      assert(res <= numBlocks)
    +      res
    +    }
    +  }
    +
    +  /**
    +    * Retrieves mapped coordinates for a given block.
    +    * @param blockId
    +    * @return
    +    */
    +  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
    +    if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
    +      throw new IllegalArgumentException(
    +          s"BlockId numeration starts from 0. $blockId is not a valid Id"
    +      )
    +    } else {
    +      val i = blockId / numBlockCols
    +      val j = blockId % numBlockCols
    +      (i, j)
    +    }
    +  }
    +
    +  /**
    +    * Retrieves the ID of the block at the given coordinates
    +    * @param i
    +    * @param j
    +    * @return
    +    */
    +  def getBlockIdByMappedCoord(i: Int, j: Int): Int = {
    +
    +    if (i < 0 || j < 0 || i >= numBlockRows || j >= numBlockCols) {
    +      throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
    --- End diff --
    
    Please use `require`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    Is travis still broken? I see it worked for one build but failed for the others. Is there anything more to review on this PR or we can proceed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76552087
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala ---
    @@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, values: Vector) extends Ordered[
     
       override def toString: String = s"($rowIndex, ${values.toString})"
     }
    +
    +/**
    +  * Serializable Reduction function used by the toBlockMatrix function. Takes an ordered list of
    +  * indexed row and split those rows to form blocks.
    +  */
    +class RowGroupReducer(blockMapper: BlockMapper)
    +    extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
    +
    +  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
    +                      out: Collector[(Int, Block)]): Unit = {
    +
    +    val sortedRows = values.toList.sortBy(_._2)
    +    val blockID = sortedRows.head._1
    +    val coo = for {
    +      (_, rowIndex, vec) <- sortedRows
    +      (colIndex, value) <- vec if value != 0
    +    } yield (rowIndex, colIndex, value)
    +
    +    val block: Block = Block(
    +        SparseMatrix.fromCOO(
    +            blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
    +    out.collect((blockID, block))
    +  }
    +}
    +
    +class RowSplitter(blockMapper: BlockMapper)
    +    extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
    +  override def flatMap(
    +      row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
    +    val IndexedRow(rowIndex, vector) = row
    +    val splitRow = sliceVector(vector)
    +    for ((mappedCol, slice) <- splitRow) {
    +      val mappedRow =
    +        math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
    +      val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, mappedCol)
    +      out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
    +    }
    +  }
    +
    +  def sliceVector(v: Vector): List[(Int, Vector)] = {
    +
    +    def getSliceByColIndex(index: Int): Int =
    --- End diff --
    
    This method is used once. Please remove this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76550602
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
    +
    +class Block() {
    +
    +  var blockData: FlinkMatrix = null
    --- End diff --
    
    Does we need to set block data as mutable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551077
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex}
    +
    +/**
    +  * This class is in charge of handling all the spatial logic required by BlockMatrix.
    +  * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates".
    +  * This is a space where blocks are indexed (starting from zero).
    +  *
    +  * So every coordinate in the original space can be mapped to this space and to the
    +  * corrisponding block.  A block have a row and a column coordinate that explicits
    +  * its position. Every set of coordinates in the mapped space corresponds to a square
    +  * of size rowPerBlock x colsPerBlock.
    +  *
    +  */
    +case class BlockMapper( //original matrix size
    +                       numRows: Int,
    +                       numCols: Int,
    +                       //block size
    +                       rowsPerBlock: Int,
    +                       colsPerBlock: Int) {
    +
    +  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
    +  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
    +  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
    +  val numBlocks = numBlockCols * numBlockRows
    +
    +  /**
    +    * Translates absolute coordinates to the mapped coordinates of the block
    +    * these coordinates belong to.
    +    * @param i
    +    * @param j
    +    * @return
    +    */
    +  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) =
    +    getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
    +
    +  /**
    +    * Retrieves a block id from original coordinates
    +    * @param i Original row
    +    * @param j Original column
    +    * @return Block ID
    +    */
    +  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = {
    +
    +    if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
    +      throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
    +    } else {
    +      val mappedRow = i / rowsPerBlock
    +      val mappedColumn = j / colsPerBlock
    +      val res = mappedRow * numBlockCols + mappedColumn
    +
    +      assert(res <= numBlocks)
    +      res
    +    }
    +  }
    +
    +  /**
    +    * Retrieves mapped coordinates for a given block.
    +    * @param blockId
    +    * @return
    +    */
    +  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
    +    if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
    +      throw new IllegalArgumentException(
    +          s"BlockId numeration starts from 0. $blockId is not a valid Id"
    +      )
    --- End diff --
    
    Please use `require` function like above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    Don't worry, take your time. I don't have much time to work on this in the coming days. I just don't want to lose the work done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r68891682
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/BlockMatrixSuite.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.ml.math.SparseMatrix
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{FlatSpec, GivenWhenThen, Matchers}
    +
    +class BlockMatrixSuite
    +    extends FlatSpec
    +    with Matchers
    +    with GivenWhenThen
    +    with FlinkTestBase {
    +
    +  val env = ExecutionEnvironment.getExecutionEnvironment
    --- End diff --
    
    I found Travis-CI is failed because of this line. We cannot reuse `ExecutionEnvironment` due to order of class initialization. It seems related to FLINK-3994. Please move initialization of `ExecutionEnvironment` and `DistributedRowMatrix` into each test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r68713373
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,287 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import java.lang
    --- End diff --
    
    Sorry, it was Intellij that imported it this way because of java.lang.Iterable. Don't know why. I will import only Iterable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r68885212
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,287 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import java.lang
    --- End diff --
    
    This line still imports `java.lang`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76550871
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
    +
    +class Block() {
    --- End diff --
    
    How about changing type of `Block` to case class? It would be better than class with `apply` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551144
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    --- End diff --
    
    Could you add a message for understanding what is the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    Any news?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76580310
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols))
    --- End diff --
    
    Ah I see. `Block.zero` has only few costs. Sorry for inconvenience.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551557
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols))
    --- End diff --
    
    Zero-filled matrix can be re-used. Please create the matrix once using `RichFlatJoinFunction` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    Sorry for long no response. Could you give me few days more? I'll review in weekend. Sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551568
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols))
    +            }
    +
    +            val (id2, block2) = Option(right) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (left._1, Block.zero(left._2.getRows, left._2.getCols))
    --- End diff --
    
    Re-use zero-filled matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

Posted by chobeat <gi...@git.apache.org>.
Github user chobeat commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76575651
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols))
    --- End diff --
    
    Those matrices may have different sizes for every block pair. Many may have the same size but we should need to track the size of every zero-matrix. I believe that this level of optimization is not really useful or impacting at this stage.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the issue:

    https://github.com/apache/flink/pull/2152
  
    This PR contains unnecessary code changes related to `DistributedRowMatrix` (`DistributedRowMatrix.scala` and `DistributedRowMatrixSuite.scala`). Please sync this PR with current master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---