You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Johannes (JIRA)" <ji...@apache.org> on 2016/09/06 21:21:20 UTC

[jira] [Created] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

Johannes created FLINK-4586:
-------------------------------

             Summary: NumberSequenceIterator and Accumulator threading issue
                 Key: FLINK-4586
                 URL: https://issues.apache.org/jira/browse/FLINK-4586
             Project: Flink
          Issue Type: Bug
          Components: DataSet API
    Affects Versions: 1.1.2
            Reporter: Johannes
            Priority: Minor


There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solution.

The following scala snippit exemplifies the problem.
Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used.
If the parallelism is set to {{1}} the result is correct, which seems like there is a problem with threading. The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
	var a : AverageAccumulator = _

	override def map(value: Long): Long = {
	  a.add(value)
	  value
	}

	override def open(parameters: Configuration): Unit = {
	  a = new AverageAccumulator
	  getRuntimeContext.addAccumulator("test", a)
	}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)