You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2014/10/14 00:52:33 UTC

[jira] [Commented] (FLINK-1163) Scala DataSet cannot be used within iterations because it is not serializable

    [ https://issues.apache.org/jira/browse/FLINK-1163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170158#comment-14170158 ] 

Till Rohrmann commented on FLINK-1163:
--------------------------------------

Well, actually it seems to be a problem of proper closure cleanup. The problem occurs for me in the following case:

{code}
object Job {
  case class Pagerank(node: Int, rank: Double)
  case class AdjacencyRow(node: Int, neighbours: Array[Int])

  val dampingFactor = 0.85
  val maxIterations = 2

  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val numVertices = 10
    val adjacencyMatrix = getInitialAdjacencyMatrix(numVertices, env)
    val initialPagerank = getInitialPagerank(numVertices, env)


    val solution = initialPagerank.iterate(maxIterations) {
        _.join(adjacencyMatrix).where(_.node).equalTo(_.node).flatMap {
          _ match{
            case (Pagerank(node, rank), AdjacencyRow(_, neighbours)) =>{
              val length = neighbours.length
              (neighbours map {
                Pagerank(_, dampingFactor*rank/length)
              }) :+ Pagerank(node, (1-dampingFactor)/numVertices)
            }
          }
        }
    }

    solution.print()

    env.execute("Flink Scala API Skeleton")
  }
}
{code}

If I put numVertices as object member, then everything works properly.

> Scala DataSet cannot be used within iterations because it is not serializable
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-1163
>                 URL: https://issues.apache.org/jira/browse/FLINK-1163
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Till Rohrmann
>
> The Scala iterations cannot deal with a Scala DataSet in its closure because it is not serializable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)