You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/15 15:55:56 UTC
[3/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task
accumulators
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4381fd0..29efc4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
@@ -49,6 +50,12 @@ public class RecordWriter<T extends IOReadableWritable> {
private final int numChannels;
+ /**
+ * Counter for the number of records emitted and for the number of bytes written.
+ * @param counter
+ */
+ private AccumulatorRegistry.Reporter reporter;
+
/** {@link RecordSerializer} per outgoing channel */
private final RecordSerializer<T>[] serializers;
@@ -81,6 +88,7 @@ public class RecordWriter<T extends IOReadableWritable> {
synchronized (serializer) {
SerializationResult result = serializer.addRecord(record);
+
while (result.isFullBuffer()) {
Buffer buffer = serializer.getCurrentBuffer();
@@ -90,8 +98,18 @@ public class RecordWriter<T extends IOReadableWritable> {
}
buffer = writer.getBufferProvider().requestBufferBlocking();
+ if (reporter != null) {
+ // increase the number of written bytes by the memory segment's size
+ reporter.reportNumBytesOut(buffer.getSize());
+ }
+
result = serializer.setNextBuffer(buffer);
}
+
+ if(reporter != null) {
+ // count number of emitted records
+ reporter.reportNumRecordsOut(1);
+ }
}
}
}
@@ -173,4 +191,14 @@ public class RecordWriter<T extends IOReadableWritable> {
}
}
}
+
+ /**
+ * Counter for the number of records emitted and the records processed.
+ */
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+ for(RecordSerializer<?> serializer : serializers) {
+ serializer.setReporter(reporter);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 9c5fdca..72434e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
@@ -52,6 +53,8 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
/**
* The abstract base class for all tasks able to participate in an iteration.
@@ -166,7 +169,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(),
- env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig());
+ env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap);
}
// --------------------------------------------------------------------------------------------
@@ -356,8 +359,10 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
- public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
- super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
+ public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+ ExecutionConfig executionConfig,
+ Map<String, Accumulator<?,?>> accumulatorMap) {
+ super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulatorMap);
}
@Override
@@ -375,6 +380,14 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
public <T extends Value> T getPreviousIterationAggregate(String name) {
return (T) getIterationAggregators().getPreviousGlobalAggregate(name);
}
+
+ @Override
+ public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) {
+ // only add accumulator on first iteration
+ if (inFirstIteration()) {
+ super.addAccumulator(name, newAccumulator);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index cf02bdf..9cb045f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.slf4j.Logger;
@@ -112,8 +113,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+ AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
- userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
+ userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
// sanity check the setup
final int writersIntoStepFunction = this.eventualOutputs.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 85dd5c5..df41672 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This is the abstract base class for every task that can be executed ba a TaskManager.
+ * This is the abstract base class for every task that can be executed by a TaskManager.
* Concrete tasks like the stream vertices of the batch tasks
* (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
deleted file mode 100644
index c824232..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This class manages the accumulators for different jobs. Either the jobs are
- * running and new accumulator results have to be merged in, or the jobs are no
- * longer running and the results shall be still available for the client or the
- * web interface. Accumulators for older jobs are automatically removed when new
- * arrive, based on a maximum number of entries.
- *
- * All functions are thread-safe and thus can be called directly from
- * JobManager.
- */
-public class AccumulatorManager {
-
- /** Map of accumulators belonging to recently started jobs */
- private final Map<JobID, JobAccumulators> jobAccumulators = new HashMap<JobID, JobAccumulators>();
-
- private final LinkedList<JobID> lru = new LinkedList<JobID>();
- private int maxEntries;
-
-
- public AccumulatorManager(int maxEntries) {
- this.maxEntries = maxEntries;
- }
-
- /**
- * Merges the new accumulators with the existing accumulators collected for
- * the job.
- */
- public void processIncomingAccumulators(JobID jobID,
- Map<String, Accumulator<?, ?>> newAccumulators) {
- synchronized (this.jobAccumulators) {
-
- JobAccumulators jobAccumulators = this.jobAccumulators.get(jobID);
- if (jobAccumulators == null) {
- jobAccumulators = new JobAccumulators();
- this.jobAccumulators.put(jobID, jobAccumulators);
- cleanup(jobID);
- }
- jobAccumulators.processNew(newAccumulators);
- }
- }
-
- public Map<String, Object> getJobAccumulatorResults(JobID jobID) {
- Map<String, Object> result = new HashMap<String, Object>();
-
- JobAccumulators acc;
- synchronized (jobAccumulators) {
- acc = jobAccumulators.get(jobID);
- }
-
- if (acc != null) {
- for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) {
- result.put(entry.getKey(), entry.getValue().getLocalValue());
- }
- }
-
- return result;
- }
-
- public Map<String, SerializedValue<Object>> getJobAccumulatorResultsSerialized(JobID jobID) throws IOException {
- JobAccumulators acc;
- synchronized (jobAccumulators) {
- acc = jobAccumulators.get(jobID);
- }
-
- if (acc == null || acc.getAccumulators().isEmpty()) {
- return Collections.emptyMap();
- }
-
- Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
- for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) {
- result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
- }
-
- return result;
- }
-
- public StringifiedAccumulatorResult[] getJobAccumulatorResultsStringified(JobID jobID) throws IOException {
- JobAccumulators acc;
- synchronized (jobAccumulators) {
- acc = jobAccumulators.get(jobID);
- }
-
- if (acc == null || acc.getAccumulators().isEmpty()) {
- return new StringifiedAccumulatorResult[0];
- }
-
- Map<String, Accumulator<?, ?>> accMap = acc.getAccumulators();
-
- StringifiedAccumulatorResult[] result = new StringifiedAccumulatorResult[accMap.size()];
- int i = 0;
- for (Map.Entry<String, Accumulator<?, ?>> entry : accMap.entrySet()) {
- String type = entry.getValue() == null ? "(null)" : entry.getValue().getClass().getSimpleName();
- String value = entry.getValue() == null ? "(null)" : entry.getValue().toString();
- result[i++] = new StringifiedAccumulatorResult(entry.getKey(), type, value);
- }
- return result;
- }
-
- /**
- * Cleanup data for the oldest jobs if the maximum number of entries is reached.
- *
- * @param jobId The (potentially new) JobId.
- */
- private void cleanup(JobID jobId) {
- if (!lru.contains(jobId)) {
- lru.addFirst(jobId);
- }
- if (lru.size() > this.maxEntries) {
- JobID toRemove = lru.removeLast();
- this.jobAccumulators.remove(toRemove);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
deleted file mode 100644
index 970d993..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-
-/**
- * Simple class wrapping a map of accumulators for a single job. Just for better
- * handling.
- */
-public class JobAccumulators {
-
- private final Map<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
-
- public Map<String, Accumulator<?, ?>> getAccumulators() {
- return this.accumulators;
- }
-
- public void processNew(Map<String, Accumulator<?, ?>> newAccumulators) {
- AccumulatorHelper.mergeInto(this.accumulators, newAccumulators);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 345e1ab..b3130a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
@@ -356,6 +357,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
+ final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+ final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+ inputReader.setReporter(reporter);
+
this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 0bbe4bf..3f1c642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
/**
@@ -187,25 +188,22 @@ public class DataSourceTask<OT> extends AbstractInvokable {
format.close();
}
} // end for all input splits
-
+
// close the collector. if it is a chaining task collector, it will close its chained tasks
this.output.close();
-
+
// close all chained tasks letting them report failure
RegularPactTask.closeChainedTasks(this.chainedTasks, this);
-
- // Merge and report accumulators
- RegularPactTask.reportAndClearAccumulators(getEnvironment(),
- new HashMap<String, Accumulator<?,?>>(), chainedTasks);
+
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
try {
this.format.close();
} catch (Throwable ignored) {}
-
+
RegularPactTask.cancelChainedTasks(this.chainedTasks);
-
+
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
@@ -275,7 +273,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
catch (Throwable t) {
throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
}
-
+
// get the factory for the type serializer
this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
}
@@ -287,7 +285,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
private void initOutputs(ClassLoader cl) throws Exception {
this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
this.eventualOutputs = new ArrayList<RecordWriter<?>>();
- this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig());
+
+ final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+ final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+ Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
+
+ this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs,
+ getExecutionConfig(), reporter, accumulatorMap);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index d0f4116..a53f5bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -74,7 +74,7 @@ public interface PactDriver<S extends Function, OT> {
* code typically signal situations where this instance in unable to proceed, exceptions
* from the user code should be forwarded.
*/
- void run() throws Exception;
+ void run() throws Exception;
/**
* This method is invoked in any case (clean termination and exception) at the end of the tasks operation.
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index b296506..bc23fa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -50,7 +50,7 @@ public interface PactTaskContext<S, OT> {
MemoryManager getMemoryManager();
IOManager getIOManager();
-
+
<X> MutableObjectIterator<X> getInput(int index);
<X> TypeSerializerFactory<X> getInputSerializer(int index);
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 1c3328e..78bf383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -20,18 +20,17 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
@@ -71,7 +70,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -114,7 +112,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
protected List<RecordWriter<?>> eventualOutputs;
/**
- * The input readers to this task.
+ * The input readers of this task.
*/
protected MutableReader<?>[] inputReaders;
@@ -212,7 +210,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected volatile boolean running = true;
-
+ /**
+ * The accumulator map used in the RuntimeContext.
+ */
+ protected Map<String, Accumulator<?,?>> accumulatorMap;
+
// --------------------------------------------------------------------------------------------
// Task Interface
// --------------------------------------------------------------------------------------------
@@ -273,7 +275,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
LOG.debug(formatLogString("Start task code."));
}
- this.runtimeUdfContext = createRuntimeContext(getEnvironment().getTaskName());
+ Environment env = getEnvironment();
+
+ this.runtimeUdfContext = createRuntimeContext(env.getTaskName());
// whatever happens in this scope, make sure that the local strategies are cleaned up!
// note that the initialization of the local strategies is in the try-finally block as well,
@@ -367,6 +371,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
clearReaders(inputReaders);
clearWriters(eventualOutputs);
+
}
if (this.running) {
@@ -505,18 +510,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// close all chained tasks letting them report failure
RegularPactTask.closeChainedTasks(this.chainedTasks, this);
-
- // Collect the accumulators of all involved UDFs and send them to the
- // JobManager. close() has been called earlier for all involved UDFs
- // (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
- // modify accumulators;
-
- // collect the counters from the udf in the core driver
- Map<String, Accumulator<?, ?>> accumulators =
- FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
-
- // collect accumulators from chained tasks and report them
- reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
@@ -557,60 +550,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
}
- /**
- * This method is called at the end of a task, receiving the accumulators of
- * the task and the chained tasks. It merges them into a single map of
- * accumulators and sends them to the JobManager.
- *
- * @param chainedTasks
- * Each chained task might have accumulators which will be merged
- * with the accumulators of the stub.
- */
- protected static void reportAndClearAccumulators(Environment env,
- Map<String, Accumulator<?, ?>> accumulators,
- ArrayList<ChainedDriver<?, ?>> chainedTasks) {
-
- // We can merge here the accumulators from the stub and the chained
- // tasks. Type conflicts can occur here if counters with same name but
- // different type were used.
-
- if (!chainedTasks.isEmpty()) {
- if (accumulators == null) {
- accumulators = new HashMap<String, Accumulator<?, ?>>();
- }
-
- for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
- if (rc != null) {
- Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators();
- if (chainedAccumulators != null) {
- AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
- }
- }
- }
- }
-
- // Don't report if the UDF didn't collect any accumulators
- if (accumulators == null || accumulators.size() == 0) {
- return;
- }
-
- // Report accumulators to JobManager
- env.reportAccumulators(accumulators);
-
- // We also clear the accumulators, since stub instances might be reused
- // (e.g. in iterations) and we don't want to count twice. This may not be
- // done before sending
- AccumulatorHelper.resetAndClearAccumulators(accumulators);
-
- for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
- if (rc != null) {
- AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
- }
- }
- }
-
protected void closeLocalStrategiesAndCaches() {
// make sure that all broadcast variable references held by this task are released
@@ -725,6 +664,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
int currentReaderOffset = 0;
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+
for (int i = 0; i < numInputs; i++) {
// ---------------- create the input readers ---------------------
// in case where a logical input unions multiple physical inputs, create a union reader
@@ -744,6 +686,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
+ inputReaders[i].setReporter(reporter);
+
currentReaderOffset += groupSize;
}
this.inputReaders = inputReaders;
@@ -1073,14 +1017,21 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
- this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig());
+ AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+ this.accumulatorMap = accumulatorRegistry.getUserMap();
+
+ this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs,
+ this.getExecutionConfig(), reporter, this.accumulatorMap);
}
public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
+
return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
- env.getDistributedCacheEntries());
+ env.getDistributedCacheEntries(), this.accumulatorMap);
}
// --------------------------------------------------------------------------------------------
@@ -1257,7 +1208,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* @return The OutputCollector that data produced in this task is submitted to.
*/
public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl,
- List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception
+ List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception
{
if (numOutputs == 0) {
return null;
@@ -1286,11 +1237,15 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
final DataDistribution distribution = config.getOutputDataDistribution(i, cl);
final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-
+
oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution);
}
- writers.add(new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe));
+ // setup accumulator counters
+ final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
+ recordWriter.setReporter(reporter);
+
+ writers.add(recordWriter);
}
if (eventualOutputs != null) {
eventualOutputs.addAll(writers);
@@ -1318,12 +1273,18 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
else {
final DataDistribution dataDist = config.getOutputDataDistribution(i, cl);
final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-
+
final TypeComparator<T> comparator = compFactory.createComparator();
oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
}
- writers.add(new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe));
+ final RecordWriter<SerializationDelegate<T>> recordWriter =
+ new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
+
+ // setup live accumulator counters
+ recordWriter.setReporter(reporter);
+
+ writers.add(recordWriter);
}
if (eventualOutputs != null) {
eventualOutputs.addAll(writers);
@@ -1338,7 +1299,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
@SuppressWarnings("unchecked")
public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config,
- List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig)
+ List<ChainedDriver<?, ?>> chainedTasksTarget,
+ List<RecordWriter<?>> eventualOutputs,
+ ExecutionConfig executionConfig,
+ AccumulatorRegistry.Reporter reporter,
+ Map<String, Accumulator<?,?>> accumulatorMap)
throws Exception
{
final int numOutputs = config.getNumOutputs();
@@ -1370,12 +1335,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
final TaskConfig chainedStubConf = config.getChainedStubConfig(i);
final String taskName = config.getChainedTaskName(i);
- if (i == numChained -1) {
+ if (i == numChained - 1) {
// last in chain, instantiate the output collector for this task
- previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs());
+ previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter);
}
- ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig);
+ ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap);
chainedTasksTarget.add(0, ct);
previous = ct;
@@ -1386,7 +1351,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// else
// instantiate the output collector the default way from this configuration
- return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs);
+ return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index b4cfa27..ea6cfe3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.chaining;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.execution.Environment;
@@ -28,6 +29,8 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
+import java.util.Map;
+
/**
* The interface to be implemented by drivers that do not run in an own pact task context, but are chained to other
* tasks.
@@ -50,20 +53,23 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
- AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig)
+ AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
+ Map<String, Accumulator<?,?>> accumulatorMap)
{
this.config = config;
this.taskName = taskName;
this.outputCollector = outputCollector;
this.userCodeClassLoader = userCodeClassLoader;
-
+
+ Environment env = parent.getEnvironment();
+
if (parent instanceof RegularPactTask) {
this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
} else {
- Environment env = parent.getEnvironment();
this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(),
- env.getDistributedCacheEntries());
+ env.getDistributedCacheEntries(), accumulatorMap
+ );
}
this.executionConfig = executionConfig;
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index f4cd354..4b480ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -41,12 +42,14 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
- public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
- super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
+ public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+ ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+ super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
}
- public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
- super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
+ public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+ ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
+ super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5ab0150..f166c36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -34,12 +33,10 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Future;
@@ -76,7 +73,9 @@ public class RuntimeEnvironment implements Environment {
private final InputGate[] inputGates;
private final ActorRef jobManagerActor;
-
+
+ private final AccumulatorRegistry accumulatorRegistry;
+
// ------------------------------------------------------------------------
public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId,
@@ -86,6 +85,7 @@ public class RuntimeEnvironment implements Environment {
ClassLoader userCodeClassLoader,
MemoryManager memManager, IOManager ioManager,
BroadcastVariableManager bcVarManager,
+ AccumulatorRegistry accumulatorRegistry,
InputSplitProvider splitProvider,
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
@@ -93,7 +93,7 @@ public class RuntimeEnvironment implements Environment {
ActorRef jobManagerActor) {
checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
-
+
this.jobId = checkNotNull(jobId);
this.jobVertexId = checkNotNull(jobVertexId);
this.executionId = checkNotNull(executionId);
@@ -107,6 +107,7 @@ public class RuntimeEnvironment implements Environment {
this.memManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
this.bcVarManager = checkNotNull(bcVarManager);
+ this.accumulatorRegistry = checkNotNull(accumulatorRegistry);
this.splitProvider = checkNotNull(splitProvider);
this.distCacheEntries = checkNotNull(distCacheEntries);
this.writers = checkNotNull(writers);
@@ -183,6 +184,11 @@ public class RuntimeEnvironment implements Environment {
}
@Override
+ public AccumulatorRegistry getAccumulatorRegistry() {
+ return accumulatorRegistry;
+ }
+
+ @Override
public InputSplitProvider getInputSplitProvider() {
return splitProvider;
}
@@ -213,20 +219,6 @@ public class RuntimeEnvironment implements Environment {
}
@Override
- public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
- AccumulatorEvent evt;
- try {
- evt = new AccumulatorEvent(getJobID(), accumulators);
- }
- catch (IOException e) {
- throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
- }
-
- ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt);
- jobManagerActor.tell(accResult, ActorRef.noSender());
- }
-
- @Override
public void acknowledgeCheckpoint(long checkpointId) {
acknowledgeCheckpoint(checkpointId, null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 616998c..13a2ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -100,10 +101,10 @@ public class Task implements Runnable {
/** The class logger. */
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-
+
/** The tread group that contains all task threads */
private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
-
+
/** For atomic state updates */
private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
@@ -176,13 +177,16 @@ public class Task implements Runnable {
/** The library cache, from which the task can request its required JAR files */
private final LibraryCacheManager libraryCache;
-
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-
+
/** The gateway to the network stack, which handles inputs and produced results */
private final NetworkEnvironment network;
+ /** The registry of this task which enables live reporting of accumulators */
+ private final AccumulatorRegistry accumulatorRegistry;
+
/** The thread that executes the task */
private final Thread executingThread;
@@ -194,10 +198,10 @@ public class Task implements Runnable {
/** atomic flag that makes sure the invokable is canceled exactly once upon error */
private final AtomicBoolean invokableHasBeenCanceled;
-
+
/** The invokable of this task, if initialized */
private volatile AbstractInvokable invokable;
-
+
/** The current execution state of the task */
private volatile ExecutionState executionState = ExecutionState.CREATED;
@@ -245,12 +249,13 @@ public class Task implements Runnable {
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
- this.broadcastVariableManager =checkNotNull(bcVarManager);
+ this.broadcastVariableManager = checkNotNull(bcVarManager);
+ this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
this.jobManager = checkNotNull(jobManagerActor);
this.taskManager = checkNotNull(taskManagerActor);
this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout));
-
+
this.libraryCache = checkNotNull(libraryCache);
this.fileCache = checkNotNull(fileCache);
this.network = checkNotNull(networkEnvironment);
@@ -361,6 +366,10 @@ public class Task implements Runnable {
return inputGatesById.get(id);
}
+ public AccumulatorRegistry getAccumulatorRegistry() {
+ return accumulatorRegistry;
+ }
+
public Thread getExecutingThread() {
return executingThread;
}
@@ -499,7 +508,8 @@ public class Task implements Runnable {
Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
taskName, taskNameWithSubtask, subtaskIndex, parallelism,
jobConfiguration, taskConfiguration,
- userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager,
+ userCodeClassLoader, memoryManager, ioManager,
+ broadcastVariableManager, accumulatorRegistry,
splitProvider, distributedCacheEntries,
writers, inputGates, jobManager);
@@ -518,7 +528,7 @@ public class Task implements Runnable {
// get our private reference onto the stack (be safe against concurrent changes)
SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-
+
if (operatorState != null) {
if (invokable instanceof OperatorStateCarrier) {
try {
@@ -553,7 +563,7 @@ public class Task implements Runnable {
if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
-
+
// notify everyone that we switched to running. especially the TaskManager needs
// to know this!
notifyObservers(ExecutionState.RUNNING, null);
@@ -653,7 +663,7 @@ public class Task implements Runnable {
finally {
try {
LOG.info("Freeing task resources for " + taskNameWithSubtask);
-
+
// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
ExecutorService dispatcher = this.asyncCallDispatcher;
@@ -867,15 +877,15 @@ public class Task implements Runnable {
*/
public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
AbstractInvokable invokable = this.invokable;
-
+
if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof CheckpointedOperator) {
-
+
// build a local closure
final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable;
final Logger logger = LOG;
final String taskName = taskNameWithSubtask;
-
+
Runnable runnable = new Runnable() {
@Override
public void run() {
@@ -1038,7 +1048,7 @@ public class Task implements Runnable {
public String toString() {
return getTaskNameWithSubtasks() + " [" + executionState + ']';
}
-
+
// ------------------------------------------------------------------------
// Task Names
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 6c85ab5..0637017 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import java.util.Arrays;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
@@ -52,8 +53,11 @@ public class TaskExecutionState implements java.io.Serializable {
// class may not be part of the system class loader.
private transient Throwable cachedError;
+ /** Serialized flink and user-defined accumulators */
+ private final AccumulatorSnapshot accumulators;
+
/**
- * Creates a new task execution state update, with no attached exception.
+ * Creates a new task execution state update, with no attached exception and no accumulators.
*
* @param jobID
* the ID of the job the task belongs to
@@ -63,13 +67,28 @@ public class TaskExecutionState implements java.io.Serializable {
* the execution state to be reported
*/
public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) {
- this(jobID, executionId, executionState, null);
+ this(jobID, executionId, executionState, null, null);
}
-
+
+ /**
+ * Creates a new task execution state update, with an attached exception but no accumulators.
+ *
+ * @param jobID
+ * the ID of the job the task belongs to
+ * @param executionId
+ * the ID of the task execution whose state is to be reported
+ * @param executionState
+ * the execution state to be reported
+ */
+ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
+ ExecutionState executionState, Throwable error) {
+ this(jobID, executionId, executionState, error, null);
+ }
+
/**
* Creates a new task execution state update, with an attached exception.
* This constructor may never throw an exception.
- *
+ *
* @param jobID
* the ID of the job the task belongs to
* @param executionId
@@ -78,11 +97,15 @@ public class TaskExecutionState implements java.io.Serializable {
* the execution state to be reported
* @param error
* an optional error
+ * @param accumulators
+ * The flink and user-defined accumulators which may be null.
*/
public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
- ExecutionState executionState, Throwable error) {
+ ExecutionState executionState, Throwable error,
+ AccumulatorSnapshot accumulators) {
- if (jobID == null || executionId == null || executionState == null) {
+
+ if (jobID == null || executionId == null || executionState == null) {
throw new NullPointerException();
}
@@ -90,6 +113,7 @@ public class TaskExecutionState implements java.io.Serializable {
this.executionId = executionId;
this.executionState = executionState;
this.cachedError = error;
+ this.accumulators = accumulators;
if (error != null) {
byte[] serializedError;
@@ -178,6 +202,13 @@ public class TaskExecutionState implements java.io.Serializable {
return this.jobID;
}
+ /**
+ * Gets flink and user-defined accumulators in serialized form.
+ */
+ public AccumulatorSnapshot getAccumulators() {
+ return accumulators;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
index f5e897b..6a5468a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
* special class loader, the deserialization fails with a {@code ClassNotFoundException}.
*
* To work around that issue, the SerializedValue serialized data immediately into a byte array.
- * When send through RPC or another service that uses serialization, the only the byte array is
+ * When send through RPC or another service that uses serialization, only the byte array is
* transferred. The object is deserialized later (upon access) and requires the accessor to
* provide the corresponding class loader.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3b4ce15..8823041 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,39 +29,39 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
-import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
-import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, ZooKeeperUtil}
-import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, StreamingMode}
+import org.apache.flink.runtime.util.ZooKeeperUtil
+import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
-import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
/**
* The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -97,7 +97,6 @@ class JobManager(
protected val scheduler: FlinkScheduler,
protected val libraryCacheManager: BlobLibraryCacheManager,
protected val archive: ActorRef,
- protected val accumulatorManager: AccumulatorManager,
protected val defaultExecutionRetries: Int,
protected val delayBetweenRetries: Long,
protected val timeout: FiniteDuration,
@@ -221,7 +220,6 @@ class JobManager(
originalSender ! result
}(context.dispatcher)
- sender ! true
case None => log.error("Cannot find execution graph for ID " +
s"${taskExecutionState.getJobID} to change state to " +
s"${taskExecutionState.getExecutionState}.")
@@ -298,7 +296,7 @@ class JobManager(
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
- accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+ executionGraph.getAccumulatorsSerialized
} catch {
case e: Exception =>
log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
@@ -402,13 +400,22 @@ class JobManager(
import scala.collection.JavaConverters._
sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
- case Heartbeat(instanceID, metricsReport) =>
- try {
- log.debug(s"Received hearbeat message from $instanceID.")
- instanceManager.reportHeartBeat(instanceID, metricsReport)
- } catch {
- case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t)
- }
+ case Heartbeat(instanceID, metricsReport, accumulators) =>
+ log.debug(s"Received hearbeat message from $instanceID.")
+
+ Future {
+ accumulators foreach {
+ case accumulators =>
+ currentJobs.get(accumulators.getJobID) match {
+ case Some((jobGraph, jobInfo)) =>
+ jobGraph.updateAccumulators(accumulators)
+ case None =>
+ // ignore accumulator values for old job
+ }
+ }
+ }(context.dispatcher)
+
+ instanceManager.reportHeartBeat(instanceID, metricsReport)
case message: AccumulatorMessage => handleAccumulatorMessage(message)
@@ -676,33 +683,18 @@ class JobManager(
* @param message The accumulator message.
*/
private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
-
message match {
- case ReportAccumulatorResult(jobId, _, accumulatorEvent) =>
- val classLoader = try {
- libraryCacheManager.getClassLoader(jobId)
- } catch {
- case e: Exception =>
- log.error("Dropping accumulators. No class loader available for job " + jobId, e)
- return
- }
-
- if (classLoader != null) {
- try {
- val accumulators = accumulatorEvent.deserializeValue(classLoader)
- accumulatorManager.processIncomingAccumulators(jobId, accumulators)
- }
- catch {
- case e: Exception => log.error("Cannot update accumulators for job " + jobId, e)
- }
- } else {
- log.error("Dropping accumulators. No class loader available for job " + jobId)
- }
case RequestAccumulatorResults(jobID) =>
try {
- val accumulatorValues: java.util.Map[String, SerializedValue[Object]] =
- accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+ val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = {
+ currentJobs.get(jobID) match {
+ case Some((graph, jobInfo)) =>
+ graph.getAccumulatorsSerialized
+ case None =>
+ null // TODO check also archive
+ }
+ }
sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
}
@@ -714,8 +706,31 @@ class JobManager(
case RequestAccumulatorResultsStringified(jobId) =>
try {
- val accumulatorValues: Array[StringifiedAccumulatorResult] =
- accumulatorManager.getJobAccumulatorResultsStringified(jobId)
+ val accumulatorValues: Array[StringifiedAccumulatorResult] = {
+ currentJobs.get(jobId) match {
+ case Some((graph, jobInfo)) =>
+ val accumulators = graph.aggregateUserAccumulators()
+
+ val result: Array[StringifiedAccumulatorResult] = new
+ Array[StringifiedAccumulatorResult](accumulators.size)
+
+ var i = 0
+ accumulators foreach {
+ case (name, accumulator) =>
+ val (typeString, valueString) =
+ if (accumulator != null) {
+ (accumulator.getClass.getSimpleName, accumulator.toString)
+ } else {
+ (null, null)
+ }
+ result(i) = new StringifiedAccumulatorResult(name, typeString, valueString)
+ i += 1
+ }
+ result
+ case None =>
+ null // TODO check also archive
+ }
+ }
sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues)
}
@@ -1058,7 +1073,7 @@ object JobManager {
*/
def createJobManagerComponents(configuration: Configuration)
: (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
- Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
+ Props, Int, Long, FiniteDuration, Int) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1091,8 +1106,6 @@ object JobManager {
val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
- val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
-
val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
var blobServer: BlobServer = null
@@ -1131,7 +1144,6 @@ object JobManager {
scheduler,
libraryCacheManager,
archiveProps,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
@@ -1179,7 +1191,6 @@ object JobManager {
scheduler,
libraryCacheManager,
archiveProps,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
@@ -1199,7 +1210,6 @@ object JobManager {
scheduler,
libraryCacheManager,
archiver,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b12f1b5..6cb571c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.messages
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.instance.InstanceID
/**
@@ -52,8 +53,10 @@ object TaskManagerMessages {
*
* @param instanceID The instance ID of the reporting TaskManager.
* @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry.
+ * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined]
*/
- case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
+ case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte],
+ accumulators: Seq[AccumulatorSnapshot])
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
index 82c4ab6..015c96e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
@@ -19,8 +19,7 @@
package org.apache.flink.runtime.messages.accumulators
import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.accumulators.{StringifiedAccumulatorResult, AccumulatorEvent}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
import org.apache.flink.runtime.util.SerializedValue
/**
@@ -38,18 +37,6 @@ sealed trait AccumulatorMessage {
sealed trait AccumulatorResultsResponse extends AccumulatorMessage
/**
- * Reports the accumulator results of the individual tasks to the job manager.
- *
- * @param jobID The ID of the job the accumulator belongs to
- * @param executionId The ID of the task execution that the accumulator belongs to.
- * @param accumulatorEvent The serialized accumulators
- */
-case class ReportAccumulatorResult(jobID: JobID,
- executionId: ExecutionAttemptID,
- accumulatorEvent: AccumulatorEvent)
- extends AccumulatorMessage
-
-/**
* Requests the accumulator results of the job identified by [[jobID]] from the job manager.
* The result is sent back to the sender as a [[AccumulatorResultsResponse]] message.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1a35d01..f07fa0c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,6 +37,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
+import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry}
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
@@ -68,6 +69,7 @@ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success}
import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
import scala.language.postfixOps
@@ -328,7 +330,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
network.getPartitionManager.releasePartitionsProducedBy(executionID)
} catch {
case t: Throwable => killTaskManagerFatal(
- "Fatal leak: Unable to release intermediate result partition data", t)
+ "Fatal leak: Unable to release intermediate result partition data", t)
}
}
@@ -389,7 +391,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
} else {
log.debug(s"Cannot find task to cancel for execution ${executionID})")
sender ! new TaskOperationResult(executionID, false,
- "No task with that execution ID was found.")
+ "No task with that execution ID was found.")
}
case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -400,7 +402,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
}
}
- }
+ }
}
/**
@@ -793,12 +795,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// create the task. this does not grab any TaskManager resources or download
// and libraries - the operation does not block
- val execId = tdd.getExecutionId
val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
self, jobManagerActor, config.timeout, libCache, fileCache)
log.info(s"Received task ${task.getTaskNameWithSubtasks}")
-
+
+ val execId = tdd.getExecutionId
// add the task to the map
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
@@ -898,22 +900,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
val task = runningTasks.remove(executionID)
if (task != null) {
-
- // the task must be in a terminal state
- if (!task.getExecutionState.isTerminal) {
- try {
- task.failExternally(new Exception("Task is being removed from TaskManager"))
- } catch {
- case e: Exception => log.error("Could not properly fail task", e)
- }
+
+ // the task must be in a terminal state
+ if (!task.getExecutionState.isTerminal) {
+ try {
+ task.failExternally(new Exception("Task is being removed from TaskManager"))
+ } catch {
+ case e: Exception => log.error("Could not properly fail task", e)
}
+ }
+
+ log.info(s"Unregistering task and sending final execution state " +
+ s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " +
+ s"(${task.getExecutionId})")
- log.info(s"Unregistering task and sending final execution state " +
- s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " +
- s"(${task.getExecutionId})")
+ val accumulators = {
+ val registry = task.getAccumulatorRegistry
+ registry.getSnapshot
+ }
- self ! UpdateTaskExecutionState(new TaskExecutionState(
- task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
+ self ! UpdateTaskExecutionState(new TaskExecutionState(
+ task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause,
+ accumulators))
}
else {
log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -931,9 +939,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
private def sendHeartbeatToJobManager(): Unit = {
try {
log.debug("Sending heartbeat to JobManager")
- val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
- currentJobManager foreach {
- jm => jm ! Heartbeat(instanceID, report)
+ val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
+
+ val accumulatorEvents =
+ scala.collection.mutable.Buffer[AccumulatorSnapshot]()
+
+ runningTasks foreach {
+ case (execID, task) =>
+ val registry = task.getAccumulatorRegistry
+ val accumulators = registry.getSnapshot
+ accumulatorEvents.append(accumulators)
+ }
+
+ currentJobManager foreach {
+ jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
}
}
catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 4e5fb40..14bf022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.io.network.api.reader;
+import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -183,5 +185,10 @@ public class AbstractReaderTest {
protected MockReader(InputGate inputGate) {
super(inputGate);
}
+
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 9b9609b..0aab5fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -61,7 +61,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
private final IOManager ioManager;
private final MemoryManager memManager;
-
+
private final List<MutableObjectIterator<Record>> inputs;
private final List<TypeComparator<Record>> comparators;
@@ -105,7 +105,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.perSortFractionMem = (double)perSortMemory/totalMem;
this.ioManager = new IOManagerAsync();
this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
-
+
this.inputs = new ArrayList<MutableObjectIterator<Record>>();
this.comparators = new ArrayList<TypeComparator<Record>>();
this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
@@ -295,7 +295,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
public IOManager getIOManager() {
return this.ioManager;
}
-
+
@Override
public MemoryManager getMemoryManager() {
return this.memManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0f62b27..b9cb416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -79,6 +80,8 @@ public class MockEnvironment implements Environment {
private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+ private final AccumulatorRegistry accumulatorRegistry;
+
private final int bufferSize;
public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
@@ -91,6 +94,8 @@ public class MockEnvironment implements Environment {
this.ioManager = new IOManagerAsync();
this.inputSplitProvider = inputSplitProvider;
this.bufferSize = bufferSize;
+
+ this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
}
public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -259,8 +264,8 @@ public class MockEnvironment implements Environment {
}
@Override
- public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
- // discard, this is only for testing
+ public AccumulatorRegistry getAccumulatorRegistry() {
+ return this.accumulatorRegistry;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index e9e761c..bd36dd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
import com.google.common.collect.Maps;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 219e5ae..f2535fa 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -65,7 +65,6 @@ class TestingCluster(userConfiguration: Configuration,
scheduler,
libraryCacheManager,
_,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
@@ -82,7 +81,6 @@ class TestingCluster(userConfiguration: Configuration,
scheduler,
libraryCacheManager,
archive,
- accumulatorManager,
executionRetries,
delayBetweenRetries,
timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 5747b7e..6d316ca 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -147,6 +147,16 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
case None => sender ! WorkingTaskManager(None)
}
+ case RequestAccumulatorValues(jobID) =>
+
+ val (flinkAccumulators, userAccumulators) = currentJobs.get(jobID) match {
+ case Some((graph, jobInfo)) =>
+ (graph.getFlinkAccumulators, graph.aggregateUserAccumulators)
+ case None => null
+ }
+
+ sender ! RequestAccumulatorValuesResponse(jobID, flinkAccumulators, userAccumulators)
+
case NotifyWhenJobStatus(jobID, state) =>
val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 241c6c0..46e8486 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.InstanceGateway
import org.apache.flink.runtime.jobgraph.JobStatus
+import java.util.Map
+import org.apache.flink.api.common.accumulators.Accumulator
object TestingJobManagerMessages {
@@ -53,4 +56,9 @@ object TestingJobManagerMessages {
case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
case class TaskManagerTerminated(taskManager: ActorRef)
+
+ case class RequestAccumulatorValues(jobID: JobID)
+ case class RequestAccumulatorValuesResponse(jobID: JobID,
+ flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+ userAccumulators: Map[String, Accumulator[_,_]])
}