You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2014/10/14 00:52:33 UTC
[jira] [Commented] (FLINK-1163) Scala DataSet cannot be used within
iterations because it is not serializable
[ https://issues.apache.org/jira/browse/FLINK-1163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170158#comment-14170158 ]
Till Rohrmann commented on FLINK-1163:
--------------------------------------
Well, actually it seems to be a problem of proper closure cleanup. The problem occurs for me in the following case:
{code}
object Job {
case class Pagerank(node: Int, rank: Double)
case class AdjacencyRow(node: Int, neighbours: Array[Int])
val dampingFactor = 0.85
val maxIterations = 2
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val numVertices = 10
val adjacencyMatrix = getInitialAdjacencyMatrix(numVertices, env)
val initialPagerank = getInitialPagerank(numVertices, env)
val solution = initialPagerank.iterate(maxIterations) {
_.join(adjacencyMatrix).where(_.node).equalTo(_.node).flatMap {
_ match{
case (Pagerank(node, rank), AdjacencyRow(_, neighbours)) =>{
val length = neighbours.length
(neighbours map {
Pagerank(_, dampingFactor*rank/length)
}) :+ Pagerank(node, (1-dampingFactor)/numVertices)
}
}
}
}
solution.print()
env.execute("Flink Scala API Skeleton")
}
}
{code}
If I put numVertices as object member, then everything works properly.
> Scala DataSet cannot be used within iterations because it is not serializable
> -----------------------------------------------------------------------------
>
> Key: FLINK-1163
> URL: https://issues.apache.org/jira/browse/FLINK-1163
> Project: Flink
> Issue Type: Bug
> Reporter: Till Rohrmann
>
> The Scala iterations cannot deal with a Scala DataSet in its closure because it is not serializable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)