You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/15 15:55:55 UTC
[2/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task
accumulators
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
index cfd27f7..a7139b6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -256,6 +257,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
}
}
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+ for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) {
+ serializer.setReporter(reporter);
+ }
+ for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) {
+ serializer.setReporter(reporter);
+ }
+ }
+
private class CoBarrierBuffer extends BarrierBuffer {
private CoBarrierBuffer otherBuffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
index 491dc06..44f9a86 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -57,6 +58,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
private final BarrierBuffer barrierBuffer;
+
@SuppressWarnings("unchecked")
protected StreamingAbstractRecordReader(InputGate inputGate) {
super(inputGate);
@@ -80,7 +82,8 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
+ Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
+ currentBuffer.recycle();
currentRecordDeserializer = null;
}
@@ -130,4 +133,12 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
public void cleanup() throws IOException {
barrierBuffer.cleanup();
}
+
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ deserializer.setReporter(reporter);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
index ad74004..1356af5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
@@ -40,4 +40,5 @@ public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
public void clearBuffers() {
super.clearBuffers();
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 80239fd..a9ebf5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import java.io.IOException;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -52,6 +53,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
InputGate inputGate = InputGateFactory.createInputGate(getEnvironment().getAllInputGates());
inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+
+ inputs.setReporter(reporter);
+
inputs.registerTaskEventListener(getSuperstepListener(), StreamingSuperstep.class);
recordIterator = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 73f0a89..41ee388 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,7 +24,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -59,7 +62,14 @@ public class OutputHandler<OUT> {
private Map<Integer, StreamConfig> chainedConfigs;
private List<StreamEdge> outEdgesInOrder;
- public OutputHandler(StreamTask<OUT, ?> vertex) {
+ /**
+ * Counters for the number of records emitted and bytes written.
+ */
+ protected AccumulatorRegistry.Reporter reporter;
+
+
+ public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
+ AccumulatorRegistry.Reporter reporter) {
// Initialize some fields
this.vertex = vertex;
@@ -75,6 +85,8 @@ public class OutputHandler<OUT> {
this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+ this.reporter = reporter;
+
// We iterate through all the out edges from this job vertex and create
// a stream output
for (StreamEdge outEdge : outEdgesInOrder) {
@@ -82,13 +94,14 @@ public class OutputHandler<OUT> {
outEdge,
outEdge.getTargetId(),
chainedConfigs.get(outEdge.getSourceId()),
- outEdgesInOrder.indexOf(outEdge));
+ outEdgesInOrder.indexOf(outEdge),
+ reporter);
outputMap.put(outEdge, streamOutput);
}
// We create the outer output that will be passed to the first task
// in the chain
- this.outerOutput = createChainedCollector(configuration);
+ this.outerOutput = createChainedCollector(configuration, accumulatorMap);
// Add the head operator to the end of the list
this.chainedOperators.add(vertex.streamOperator);
@@ -121,7 +134,8 @@ public class OutputHandler<OUT> {
* config
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig) {
+ private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
+ Preconditions.checkNotNull(accumulatorMap);
// We create a wrapper that will encapsulate the chained operators and
@@ -141,7 +155,7 @@ public class OutputHandler<OUT> {
for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
Integer output = outputEdge.getTargetId();
- Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
+ Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap);
wrapper.addCollector(outCollector, outputEdge);
}
@@ -155,8 +169,8 @@ public class OutputHandler<OUT> {
// operator which will be returned and set it up using the wrapper
OneInputStreamOperator chainableOperator =
chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
-
- StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig);
+
+ StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
vertex.contexts.add(chainedContext);
chainableOperator.setup(wrapper, chainedContext);
@@ -188,7 +202,7 @@ public class OutputHandler<OUT> {
* @return The created StreamOutput
*/
private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
- StreamConfig upStreamConfig, int outputIndex) {
+ StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) {
StreamRecordSerializer<T> outSerializer = upStreamConfig
.getTypeSerializerOut1(vertex.userClassLoader);
@@ -207,6 +221,8 @@ public class OutputHandler<OUT> {
RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
+ output.setReporter(reporter);
+
StreamOutput<T> streamOutput = new StreamOutput<T>(output, outSerializationDelegate);
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 25fe83d..e5d58d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -18,10 +18,13 @@
package org.apache.flink.streaming.runtime.tasks;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,7 +49,11 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
@Override
public void registerInputOutput() {
super.registerInputOutput();
- outputHandler = new OutputHandler<OUT>(this);
+
+ final AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ Map<String, Accumulator<?, ?>> accumulatorMap = registry.getUserMap();
+
+ outputHandler = new OutputHandler<OUT>(this, accumulatorMap, outputHandler.reporter);
String iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f98ed2d..4ffc8f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,10 +25,12 @@ import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -87,13 +89,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
streamOperator = configuration.getStreamOperator(userClassLoader);
- outputHandler = new OutputHandler<OUT>(this);
+ // Create and register Accumulators
+ Environment env = getEnvironment();
+ AccumulatorRegistry accumulatorRegistry = env.getAccumulatorRegistry();
+ Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
+ AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+ outputHandler = new OutputHandler<OUT>(this, accumulatorMap, reporter);
if (streamOperator != null) {
// IterationHead and IterationTail don't have an Operator...
//Create context of the head operator
- headContext = createRuntimeContext(configuration);
+ headContext = createRuntimeContext(configuration, accumulatorMap);
this.contexts.add(headContext);
streamOperator.setup(outputHandler.getOutput(), headContext);
}
@@ -105,14 +113,14 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
return getEnvironment().getTaskName();
}
- public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
+ public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
Environment env = getEnvironment();
String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(),
- getExecutionConfig(), statePartitioner, getStateHandleProvider());
+ getExecutionConfig(), statePartitioner, getStateHandleProvider(), accumulatorMap);
}
private StateHandleProvider<Serializable> getStateHandleProvider() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 3efd619..7eff16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.state.OperatorState;
@@ -55,9 +56,9 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
@SuppressWarnings("unchecked")
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
- StateHandleProvider<?> provider) {
+ StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) {
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
- executionConfig, env.getDistributedCacheEntries());
+ executionConfig, env.getDistributedCacheEntries(), accumulatorMap);
this.env = env;
this.statePartitioner = statePartitioner;
this.states = new HashMap<String, StreamOperatorState>();
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 8c7ffeb..eb49e26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -25,11 +25,13 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.functions.KeySelector;
@@ -138,7 +140,7 @@ public class StatefulOperatorTest {
StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner,
- new LocalStateHandleProvider<Serializable>());
+ new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 0afe8b5..89ec7dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -24,7 +24,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
@@ -198,6 +200,10 @@ public class BarrierBufferTest {
super(inputGate);
}
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+ }
}
protected static BufferOrEvent createSuperstep(long id, int channel) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9864115..2092d83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -82,6 +83,8 @@ public class StreamMockEnvironment implements Environment {
private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+ private final AccumulatorRegistry accumulatorRegistry;
+
private final int bufferSize;
public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
@@ -94,6 +97,8 @@ public class StreamMockEnvironment implements Environment {
this.ioManager = new IOManagerAsync();
this.inputSplitProvider = inputSplitProvider;
this.bufferSize = bufferSize;
+
+ this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
}
public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -262,8 +267,8 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
- // discard, this is only for testing
+ public AccumulatorRegistry getAccumulatorRegistry() {
+ return accumulatorRegistry;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 344fc7d..0467b5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -157,7 +159,7 @@ public class MockCoContext<IN1, IN2, OUT> {
MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask",
new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig(), null, null);
+ new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
operator.setup(mockContext.collector, runtimeContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index fc5079a..0d09c14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -105,7 +107,7 @@ public class MockContext<IN, OUT> {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig(), null, null);
+ new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
operator.setup(mockContext.output, runtimeContext);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index dafd9a3..764fe5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.util;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -38,7 +40,7 @@ public class SourceFunctionUtil<T> {
List<T> outputs = new ArrayList<T>();
if (sourceFunction instanceof RichFunction) {
RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>());
+ new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
((RichFunction) sourceFunction).open(new Configuration());
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index 6b39734..5bfa49b 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -19,6 +19,7 @@
package org.apache.flink.tez.runtime;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.operators.PactDriver;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -68,7 +69,8 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP
getContext().getVertexParallelism(),
getContext().getTaskIndex(),
getClass().getClassLoader(),
- new ExecutionConfig());
+ new ExecutionConfig(),
+ new HashMap<String, Accumulator<?, ?>>());
this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 1b39dbd..d23469e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -70,7 +70,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
Plan p = getTestJob();
p.setExecutionConfig(new ExecutionConfig());
if (p == null) {
- Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
+ Assert.fail("Error: Cannot obtain Pact plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
}
Optimizer pc = new Optimizer(new DataStatistics(), this.config);
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 0534178..509b86f 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -85,7 +85,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
scheduler,
libraryCacheManager,
_,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
@@ -106,7 +105,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
scheduler,
libraryCacheManager,
archive,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 11d6b83..d7d45fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -127,8 +127,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
// Add built-in accumulator without convenience function
getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
- // Add custom counter. Didn't find a way to do this with
- // getAccumulator()
+ // Add custom counter
this.distinctWords = new SetAccumulator<StringValue>();
this.getRuntimeContext().addAccumulator("distinct-words", distinctWords);
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index 78bbe68..6221c08 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -72,7 +72,7 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
- Integer res = (Integer) getJobExecutionResult().getAccumulatorResult("test");
+ Integer res = getJobExecutionResult().getAccumulatorResult("test");
Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), res);
}
@@ -117,18 +117,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
private static final long serialVersionUID = 1L;
private IntCounter testCounter = new IntCounter();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- getRuntimeContext().addAccumulator("test", this.testCounter);
- }
-
+
@Override
public void reduce(Iterator<Record> records, Collector<Record> out) {
// Compute the sum
int sum = 0;
-
+
while (records.hasNext()) {
Record r = records.next();
Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue());
@@ -137,6 +131,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
}
out.collect(new Record(new StringValue(Integer.toString(sum))));
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ getRuntimeContext().addAccumulator("test", this.testCounter);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
new file mode 100644
index 0000000..84c50a9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.*;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test the availability of accumulator results during runtime.
+ */
+@SuppressWarnings("serial")
+public class AccumulatorLiveITCase {
+
+ private static ActorSystem system;
+ private static ActorRef jobManager;
+
+ // name of accumulator
+ private static String NAME = "test";
+ // time to wait between changing the accumulator value
+ private static long WAIT_TIME = TaskManager.HEARTBEAT_INTERVAL().toMillis() + 500;
+
+ // number of heartbeat intervals to check
+ private static int NUM_ITERATIONS = 3;
+ // numer of retries in case the expected value is not seen
+ private static int NUM_RETRIES = 10;
+
+ private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS);
+
+
+ @Before
+ public void before() throws Exception {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+ TestingCluster testingCluster = TestingUtils.startTestingCluster(1, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ jobManager = testingCluster.getJobManager();
+
+ // generate test data
+ for (int i=0; i < NUM_ITERATIONS; i++) {
+ inputData.add(i, String.valueOf(i+1));
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public void testProgram() throws Exception {
+
+ new JavaTestKit(system) {{
+
+ /** The program **/
+ ExecutionEnvironment env = new PlanExtractor();
+ DataSet<String> input = env.fromCollection(inputData);
+ input
+ .flatMap(new Tokenizer())
+ .flatMap(new WaitingUDF())
+ .output(new WaitingOutputFormat());
+ env.execute();
+
+ /** Extract job graph **/
+ JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan);
+
+ jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, false), getRef());
+ expectMsgClass(Status.Success.class);
+
+ /* Check for accumulator values */
+ int i = 0, retries = 0;
+ int expectedAccVal = 0;
+ while(i <= NUM_ITERATIONS) {
+ if (retries > 0) {
+ // retry fast
+ Thread.sleep(WAIT_TIME / NUM_RETRIES);
+ } else {
+ // wait for heartbeat interval
+ Thread.sleep(WAIT_TIME);
+ }
+
+ jobManager.tell(new RequestAccumulatorValues(jobGraph.getJobID()), getRef());
+ RequestAccumulatorValuesResponse response =
+ expectMsgClass(RequestAccumulatorValuesResponse.class);
+
+ Map<String, Accumulator<?, ?>> userAccumulators = response.userAccumulators();
+ Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> flinkAccumulators =
+ response.flinkAccumulators();
+
+ if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(i == NUM_ITERATIONS, i, i * 4, flinkAccumulators)) {
+// System.out.println("Passed round " + i);
+ // We passed this round
+ i += 1;
+ expectedAccVal += i;
+ retries = 0;
+ } else {
+ if (retries < NUM_RETRIES) {
+// System.out.println("retrying for the " + retries + " time.");
+ // try again
+ retries += 1;
+ } else {
+ fail("Failed in round #" + i + " after " + retries + " retries.");
+ }
+ }
+ }
+
+// expectMsgClass(new FiniteDuration(10, TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class);
+ }};
+ }
+
+ private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
+// System.out.println("checking user accumulators");
+ return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
+ }
+
+ private static boolean checkFlinkAccumulators(boolean lastRound, int expectedRecords, int expectedBytes,
+ Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> accumulatorMap) {
+// System.out.println("checking flink accumulators");
+ boolean returnValue = false;
+
+ for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : accumulatorMap.values()) {
+ if (taskMap != null) {
+ for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : taskMap.entrySet()) {
+ switch (entry.getKey()) {
+ /**
+ * The following two cases are for the DataSource and Map task
+ */
+ case NUM_RECORDS_OUT:
+ if (!lastRound) {
+ assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+ returnValue = true;
+ }
+ break;
+ case NUM_BYTES_OUT:
+ if (!lastRound) {
+ assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+ returnValue = true;
+ }
+ break;
+ /**
+ * The following two cases are for the DataSink task
+ */
+ case NUM_RECORDS_IN:
+ // check if we are in last round and in current task accumulator map
+ if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) {
+ assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+ returnValue = true;
+ }
+ break;
+ case NUM_BYTES_IN:
+ if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) {
+ assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+ returnValue = true;
+ }
+ break;
+ default:
+ fail("Unknown accumulator found.");
+ }
+ }
+ }
+ }
+ return returnValue;
+ }
+
+
+ public static class Tokenizer implements FlatMapFunction<String, String> {
+
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ for (String str : value.split("\n")) {
+ out.collect(str);
+ }
+ }
+ }
+
+ /**
+ * UDF that waits for at least the heartbeat interval's duration.
+ */
+ private static class WaitingUDF extends RichFlatMapFunction<String, Integer> {
+
+ private IntCounter counter = new IntCounter();
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator(NAME, counter);
+ }
+
+ @Override
+ public void flatMap(String value, Collector<Integer> out) throws Exception {
+ /* Wait here to check the accumulator value in the meantime */
+ Thread.sleep(WAIT_TIME);
+ int val = Integer.valueOf(value);
+ counter.add(val);
+ out.collect(val);
+ }
+ }
+
+ private static class WaitingOutputFormat implements OutputFormat<Integer> {
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+
+ }
+
+ @Override
+ public void writeRecord(Integer record) throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+// System.out.println("starting output task");
+ Thread.sleep(WAIT_TIME);
+ } catch (InterruptedException e) {
+ fail("Interrupted test.");
+ }
+ }
+ }
+
+ /**
+ * Helpers to generate the JobGraph
+ */
+ private static JobGraph getOptimizedPlan(Plan plan) {
+ Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ OptimizedPlan op = pc.compile(plan);
+ return jgg.compileJobGraph(op);
+ }
+
+ private static class PlanExtractor extends LocalEnvironment {
+
+ private Plan plan = null;
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ plan = createProgramPlan();
+ return new JobExecutionResult(new JobID(), -1, null);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 8cfeead..c497a90 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -240,7 +240,6 @@ object ApplicationMaster {
scheduler,
libraryCacheManager,
archiveProps,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
@@ -257,7 +256,6 @@ object ApplicationMaster {
scheduler,
libraryCacheManager,
archiver,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,