You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/15 15:55:55 UTC

[2/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task accumulators

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
index cfd27f7..a7139b6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -256,6 +257,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 		}
 	}
 
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) {
+			serializer.setReporter(reporter);
+		}
+		for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) {
+			serializer.setReporter(reporter);
+		}
+	}
+
 	private class CoBarrierBuffer extends BarrierBuffer {
 
 		private CoBarrierBuffer otherBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
index 491dc06..44f9a86 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -57,6 +58,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
 
 	private final BarrierBuffer barrierBuffer;
 
+
 	@SuppressWarnings("unchecked")
 	protected StreamingAbstractRecordReader(InputGate inputGate) {
 		super(inputGate);
@@ -80,7 +82,8 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
 				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
 
 				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
+					Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
+					currentBuffer.recycle();
 					currentRecordDeserializer = null;
 				}
 
@@ -130,4 +133,12 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
 	public void cleanup() throws IOException {
 		barrierBuffer.cleanup();
 	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			deserializer.setReporter(reporter);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
index ad74004..1356af5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
@@ -40,4 +40,5 @@ public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
 	public void clearBuffers() {
 		super.clearBuffers();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 80239fd..a9ebf5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -52,6 +53,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 			InputGate inputGate = InputGateFactory.createInputGate(getEnvironment().getAllInputGates());
 			inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
 
+			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+
+			inputs.setReporter(reporter);
+
 			inputs.registerTaskEventListener(getSuperstepListener(), StreamingSuperstep.class);
 
 			recordIterator = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 73f0a89..41ee388 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,7 +24,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -59,7 +62,14 @@ public class OutputHandler<OUT> {
 	private Map<Integer, StreamConfig> chainedConfigs;
 	private List<StreamEdge> outEdgesInOrder;
 
-	public OutputHandler(StreamTask<OUT, ?> vertex) {
+	/**
+	 * Counters for the number of records emitted and bytes written.
+	 */
+	protected AccumulatorRegistry.Reporter reporter;
+
+
+	public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
+						AccumulatorRegistry.Reporter reporter) {
 
 		// Initialize some fields
 		this.vertex = vertex;
@@ -75,6 +85,8 @@ public class OutputHandler<OUT> {
 
 		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
 
+		this.reporter = reporter;
+
 		// We iterate through all the out edges from this job vertex and create
 		// a stream output
 		for (StreamEdge outEdge : outEdgesInOrder) {
@@ -82,13 +94,14 @@ public class OutputHandler<OUT> {
 					outEdge,
 					outEdge.getTargetId(),
 					chainedConfigs.get(outEdge.getSourceId()),
-					outEdgesInOrder.indexOf(outEdge));
+					outEdgesInOrder.indexOf(outEdge),
+					reporter);
 			outputMap.put(outEdge, streamOutput);
 		}
 
 		// We create the outer output that will be passed to the first task
 		// in the chain
-		this.outerOutput = createChainedCollector(configuration);
+		this.outerOutput = createChainedCollector(configuration, accumulatorMap);
 		
 		// Add the head operator to the end of the list
 		this.chainedOperators.add(vertex.streamOperator);
@@ -121,7 +134,8 @@ public class OutputHandler<OUT> {
 	 * config
 	 */
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig) {
+	private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
+		Preconditions.checkNotNull(accumulatorMap);
 
 
 		// We create a wrapper that will encapsulate the chained operators and
@@ -141,7 +155,7 @@ public class OutputHandler<OUT> {
 		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
 			Integer output = outputEdge.getTargetId();
 
-			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
+			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap);
 
 			wrapper.addCollector(outCollector, outputEdge);
 		}
@@ -155,8 +169,8 @@ public class OutputHandler<OUT> {
 			// operator which will be returned and set it up using the wrapper
 			OneInputStreamOperator chainableOperator =
 					chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
-			
-			StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig);
+
+			StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
 			vertex.contexts.add(chainedContext);
 			
 			chainableOperator.setup(wrapper, chainedContext);
@@ -188,7 +202,7 @@ public class OutputHandler<OUT> {
 	 * @return The created StreamOutput
 	 */
 	private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
-			StreamConfig upStreamConfig, int outputIndex) {
+			StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) {
 
 		StreamRecordSerializer<T> outSerializer = upStreamConfig
 				.getTypeSerializerOut1(vertex.userClassLoader);
@@ -207,6 +221,8 @@ public class OutputHandler<OUT> {
 		RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
 				RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 
+		output.setReporter(reporter);
+
 		StreamOutput<T> streamOutput = new StreamOutput<T>(output, outSerializationDelegate);
 
 		if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 25fe83d..e5d58d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -18,10 +18,13 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,7 +49,11 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	@Override
 	public void registerInputOutput() {
 		super.registerInputOutput();
-		outputHandler = new OutputHandler<OUT>(this);
+
+		final AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+		Map<String, Accumulator<?, ?>> accumulatorMap = registry.getUserMap();
+
+		outputHandler = new OutputHandler<OUT>(this, accumulatorMap, outputHandler.reporter);
 
 		String iterationId = configuration.getIterationId();
 		iterationWaitTime = configuration.getIterationWaitTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f98ed2d..4ffc8f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,10 +25,12 @@ import java.util.Map;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -87,13 +89,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 		streamOperator = configuration.getStreamOperator(userClassLoader);
 
-		outputHandler = new OutputHandler<OUT>(this);
+		// Create and register Accumulators
+		Environment env = getEnvironment();
+		AccumulatorRegistry accumulatorRegistry = env.getAccumulatorRegistry();
+		Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
+		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+		outputHandler = new OutputHandler<OUT>(this, accumulatorMap, reporter);
 
 		if (streamOperator != null) {
 			// IterationHead and IterationTail don't have an Operator...
 
 			//Create context of the head operator
-			headContext = createRuntimeContext(configuration);
+			headContext = createRuntimeContext(configuration, accumulatorMap);
 			this.contexts.add(headContext);
 			streamOperator.setup(outputHandler.getOutput(), headContext);
 		}
@@ -105,14 +113,14 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		return getEnvironment().getTaskName();
 	}
 
-	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
+	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
 		Environment env = getEnvironment();
 		String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
 
 		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
 
 		return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(),
-				getExecutionConfig(), statePartitioner, getStateHandleProvider());
+				getExecutionConfig(), statePartitioner, getStateHandleProvider(), accumulatorMap);
 	}
 
 	private StateHandleProvider<Serializable> getStateHandleProvider() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 3efd619..7eff16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.state.OperatorState;
@@ -55,9 +56,9 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	@SuppressWarnings("unchecked")
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
 			ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
-			StateHandleProvider<?> provider) {
+			StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) {
 		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
-				executionConfig, env.getDistributedCacheEntries());
+				executionConfig, env.getDistributedCacheEntries(), accumulatorMap);
 		this.env = env;
 		this.statePartitioner = statePartitioner;
 		this.states = new HashMap<String, StreamOperatorState>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 8c7ffeb..eb49e26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -25,11 +25,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -138,7 +140,7 @@ public class StatefulOperatorTest {
 
 		StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
 				new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner,
-				new LocalStateHandleProvider<Serializable>());
+				new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
 
 		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 0afe8b5..89ec7dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -24,7 +24,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
@@ -198,6 +200,10 @@ public class BarrierBufferTest {
 			super(inputGate);
 		}
 
+		@Override
+		public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+		}
 	}
 
 	protected static BufferOrEvent createSuperstep(long id, int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9864115..2092d83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -82,6 +83,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
 
+	private final AccumulatorRegistry accumulatorRegistry;
+
 	private final int bufferSize;
 
 	public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
@@ -94,6 +97,8 @@ public class StreamMockEnvironment implements Environment {
 		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
+
+		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -262,8 +267,8 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
-		// discard, this is only for testing
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return accumulatorRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 344fc7d..0467b5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -157,7 +159,7 @@ public class MockCoContext<IN1, IN2, OUT> {
 		MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
 		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask",
 				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig(), null, null);
+				new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
 
 		operator.setup(mockContext.collector, runtimeContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index fc5079a..0d09c14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -105,7 +107,7 @@ public class MockContext<IN, OUT> {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
 		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
 				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig(), null, null);
+				new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
 
 		operator.setup(mockContext.output, runtimeContext);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index dafd9a3..764fe5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.util;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
@@ -38,7 +40,7 @@ public class SourceFunctionUtil<T> {
 		List<T> outputs = new ArrayList<T>();
 		if (sourceFunction instanceof RichFunction) {
 			RuntimeContext runtimeContext =  new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-					new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>());
+					new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 
 			((RichFunction) sourceFunction).open(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index 6b39734..5bfa49b 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.tez.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -68,7 +69,8 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP
 				getContext().getVertexParallelism(),
 				getContext().getTaskIndex(),
 				getClass().getClassLoader(),
-				new ExecutionConfig());
+				new ExecutionConfig(),
+				new HashMap<String, Accumulator<?, ?>>());
 
 		this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 1b39dbd..d23469e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -70,7 +70,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 		Plan p = getTestJob();
 		p.setExecutionConfig(new ExecutionConfig());
 		if (p == null) {
-			Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
+			Assert.fail("Error: Cannot obtain Pact plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
 		}
 		
 		Optimizer pc = new Optimizer(new DataStatistics(), this.config);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 0534178..509b86f 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -85,7 +85,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
       scheduler,
       libraryCacheManager,
       _,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -106,7 +105,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
         scheduler,
         libraryCacheManager,
         archive,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 11d6b83..d7d45fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -127,8 +127,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 			// Add built-in accumulator without convenience function
 			getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
 
-			// Add custom counter. Didn't find a way to do this with
-			// getAccumulator()
+			// Add custom counter
 			this.distinctWords = new SetAccumulator<StringValue>();
 			this.getRuntimeContext().addAccumulator("distinct-words", distinctWords);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index 78bbe68..6221c08 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -72,7 +72,7 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(EXPECTED, resultPath);
 		
-		Integer res = (Integer) getJobExecutionResult().getAccumulatorResult("test");
+		Integer res = getJobExecutionResult().getAccumulatorResult("test");
 		Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), res);
 	}
 
@@ -117,18 +117,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
 		private static final long serialVersionUID = 1L;
 		
 		private IntCounter testCounter = new IntCounter();
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			getRuntimeContext().addAccumulator("test", this.testCounter);
-		}
-		
+
 		@Override
 		public void reduce(Iterator<Record> records, Collector<Record> out) {
 			// Compute the sum
 			int sum = 0;
-			
+
 			while (records.hasNext()) {
 				Record r = records.next();
 				Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue());
@@ -137,6 +131,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
 			}
 			out.collect(new Record(new StringValue(Integer.toString(sum))));
 		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			getRuntimeContext().addAccumulator("test", this.testCounter);
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
new file mode 100644
index 0000000..84c50a9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.*;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test the availability of accumulator results during runtime.
+ */
+@SuppressWarnings("serial")
+public class AccumulatorLiveITCase {
+
+	private static ActorSystem system;
+	private static ActorRef jobManager;
+
+	// name of accumulator
+	private static String NAME = "test";
+	// time to wait between changing the accumulator value
+	private static long WAIT_TIME = TaskManager.HEARTBEAT_INTERVAL().toMillis() + 500;
+
+	// number of heartbeat intervals to check
+	private static int NUM_ITERATIONS = 3;
+	// numer of retries in case the expected value is not seen
+	private static int NUM_RETRIES = 10;
+
+	private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS);
+
+
+	@Before
+	public void before() throws Exception {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+		TestingCluster testingCluster = TestingUtils.startTestingCluster(1, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		jobManager = testingCluster.getJobManager();
+
+		// generate test data
+		for (int i=0; i < NUM_ITERATIONS; i++) {
+			inputData.add(i, String.valueOf(i+1));
+		}
+	}
+
+	@After
+	public void after() throws Exception {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	@Test
+	public void testProgram() throws Exception {
+
+		new JavaTestKit(system) {{
+
+			/** The program **/
+			ExecutionEnvironment env = new PlanExtractor();
+			DataSet<String> input = env.fromCollection(inputData);
+			input
+					.flatMap(new Tokenizer())
+					.flatMap(new WaitingUDF())
+					.output(new WaitingOutputFormat());
+			env.execute();
+
+			/** Extract job graph **/
+			JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan);
+
+			jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, false), getRef());
+			expectMsgClass(Status.Success.class);
+
+			/* Check for accumulator values */
+			int i = 0, retries = 0;
+			int expectedAccVal = 0;
+			while(i <= NUM_ITERATIONS) {
+				if (retries > 0) {
+					// retry fast
+					Thread.sleep(WAIT_TIME / NUM_RETRIES);
+				} else {
+					// wait for heartbeat interval
+					Thread.sleep(WAIT_TIME);
+				}
+
+				jobManager.tell(new RequestAccumulatorValues(jobGraph.getJobID()), getRef());
+				RequestAccumulatorValuesResponse response =
+						expectMsgClass(RequestAccumulatorValuesResponse.class);
+
+				Map<String, Accumulator<?, ?>> userAccumulators = response.userAccumulators();
+				Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> flinkAccumulators =
+						response.flinkAccumulators();
+
+				if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(i == NUM_ITERATIONS, i, i * 4, flinkAccumulators)) {
+//					System.out.println("Passed round " + i);
+					// We passed this round
+					i += 1;
+					expectedAccVal += i;
+					retries = 0;
+				} else {
+					if (retries < NUM_RETRIES) {
+//						System.out.println("retrying for the " + retries + " time.");
+						// try again
+						retries += 1;
+					} else {
+						fail("Failed in round #" + i + " after " + retries + " retries.");
+					}
+				}
+			}
+
+//			expectMsgClass(new FiniteDuration(10, TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class);
+		}};
+	}
+
+	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
+//		System.out.println("checking user accumulators");
+		return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
+	}
+
+	private static boolean checkFlinkAccumulators(boolean lastRound, int expectedRecords, int expectedBytes,
+												  Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> accumulatorMap) {
+//		System.out.println("checking flink accumulators");
+		boolean returnValue = false;
+
+		for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : accumulatorMap.values()) {
+			if (taskMap != null) {
+				for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : taskMap.entrySet()) {
+					switch (entry.getKey()) {
+						/**
+						 * The following two cases are for the DataSource and Map task
+						 */
+						case NUM_RECORDS_OUT:
+							if (!lastRound) {
+								assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+								returnValue = true;
+							}
+							break;
+						case NUM_BYTES_OUT:
+							if (!lastRound) {
+								assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+								returnValue = true;
+							}
+							break;
+						/**
+						 * The following two cases are for the DataSink task
+						 */
+						case NUM_RECORDS_IN:
+							// check if we are in last round and in current task accumulator map
+							if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) {
+								assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+								returnValue = true;
+							}
+							break;
+						case NUM_BYTES_IN:
+							if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) {
+								assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+								returnValue = true;
+							}
+							break;
+						default:
+							fail("Unknown accumulator found.");
+					}
+				}
+			}
+		}
+		return returnValue;
+	}
+
+
+	public static class Tokenizer implements FlatMapFunction<String, String> {
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			for (String str : value.split("\n")) {
+				out.collect(str);
+			}
+		}
+	}
+
+	/**
+	 * UDF that waits for at least the heartbeat interval's duration.
+	 */
+	private static class WaitingUDF extends RichFlatMapFunction<String, Integer> {
+
+		private IntCounter counter = new IntCounter();
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator(NAME, counter);
+		}
+
+		@Override
+		public void flatMap(String value, Collector<Integer> out) throws Exception {
+			/* Wait here to check the accumulator value in the meantime */
+			Thread.sleep(WAIT_TIME);
+			int val = Integer.valueOf(value);
+			counter.add(val);
+			out.collect(val);
+		}
+	}
+
+	private static class WaitingOutputFormat implements OutputFormat<Integer> {
+
+		@Override
+		public void configure(Configuration parameters) {
+
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+
+		}
+
+		@Override
+		public void writeRecord(Integer record) throws IOException {
+		}
+
+		@Override
+		public void close() throws IOException {
+			try {
+//				System.out.println("starting output task");
+				Thread.sleep(WAIT_TIME);
+			} catch (InterruptedException e) {
+				fail("Interrupted test.");
+			}
+		}
+	}
+
+	/**
+	 * Helpers to generate the JobGraph
+	 */
+	private static JobGraph getOptimizedPlan(Plan plan) {
+		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		OptimizedPlan op = pc.compile(plan);
+		return jgg.compileJobGraph(op);
+	}
+
+	private static class PlanExtractor extends LocalEnvironment {
+
+		private Plan plan = null;
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			plan = createProgramPlan();
+			return new JobExecutionResult(new JobID(), -1, null);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 8cfeead..c497a90 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -240,7 +240,6 @@ object ApplicationMaster {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -257,7 +256,6 @@ object ApplicationMaster {
         scheduler,
         libraryCacheManager,
         archiver,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,