You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/10 14:04:46 UTC
[4/4] flink git commit: [runtime] Accumulators are reported through
the RuntimeEnvironment, not directly sent as an actor message
[runtime] Accumulators are reported through the RuntimeEnvironment, not directly sent as an actor message
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/201bea3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/201bea3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/201bea3f
Branch: refs/heads/master
Commit: 201bea3f20bf4d96123bbb562db834297d8e9ec4
Parents: 9ffcdf3
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 8 17:11:44 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 10 12:58:39 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/execution/Environment.java | 38 ++++++++++++++++----
.../runtime/execution/RuntimeEnvironment.java | 19 ++++++++++
.../runtime/operators/RegularPactTask.java | 16 ++++-----
3 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 503a0b9..1bd4f7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.execution;
import akka.actor.ActorRef;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -80,7 +81,8 @@ public interface Environment {
int getNumberOfSubtasks();
/**
- * Returns the index of this subtask in the subtask group.
+ * Returns the index of this subtask in the subtask group. The index
+ * is between 0 and {@link #getNumberOfSubtasks()} - 1.
*
* @return the index of this subtask in the subtask group
*/
@@ -89,7 +91,8 @@ public interface Environment {
/**
* Returns the input split provider assigned to this environment.
*
- * @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.
+ * @return The input split provider or {@code null} if no such
+ * provider has been assigned to this environment.
*/
InputSplitProvider getInputSplitProvider();
@@ -114,12 +117,15 @@ public interface Environment {
*/
String getTaskName();
- String getTaskNameWithSubtasks();
-
/**
- * Returns the proxy object for the accumulator protocol.
+ * Returns the name of the task running in this environment, appended
+ * with the subtask indicator, such as "MyTask (3/6)", where
+ * 3 would be ({@link #getIndexInSubtaskGroup()} + 1), and 6 would be
+ * {@link #getNumberOfSubtasks()}.
+ *
+ * @return The name of the task running in this environment, with subtask indicator.
*/
- ActorRef getJobManager();
+ String getTaskNameWithSubtasks();
/**
* Returns the user code class loader
@@ -130,6 +136,17 @@ public interface Environment {
BroadcastVariableManager getBroadcastVariableManager();
+ /**
+ * Reports the given set of accumulators to the JobManager.
+ *
+ * @param accumulators The accumulators to report.
+ */
+ void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
+
+ // --------------------------------------------------------------------------------------------
+ // Fields relevant to the I/O system. Should go into Task
+ // --------------------------------------------------------------------------------------------
+
ResultPartitionWriter getWriter(int index);
ResultPartitionWriter[] getAllWriters();
@@ -138,4 +155,13 @@ public interface Environment {
InputGate[] getAllInputGates();
+
+ /**
+ * Returns the proxy object for the accumulator protocol.
+ */
+ // THIS DOES NOT BELONG HERE, THIS TOTALLY BREAKS COMPONENTIZATION.
+ // THE EXECUTED TASKS HAVE BEEN KEPT INDEPENDENT OF ANY RPC OR ACTOR
+ // COMMUNICATION !!!
+ ActorRef getJobManager();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 3fb7493..5416f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -19,8 +19,11 @@
package org.apache.flink.runtime.execution;
import akka.actor.ActorRef;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -38,10 +41,12 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
import org.apache.flink.runtime.taskmanager.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -352,6 +357,20 @@ public class RuntimeEnvironment implements Environment, Runnable {
}
@Override
+ public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
+ AccumulatorEvent evt;
+ try {
+ evt = new AccumulatorEvent(getJobID(), accumulators);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
+ }
+
+ ReportAccumulatorResult accResult = new ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt);
+ jobManager.tell(accResult, ActorRef.noSender());
+ }
+
+ @Override
public ResultPartitionWriter getWriter(int index) {
checkElementIndex(index, writers.length, "Illegal environment writer request.");
http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 71b8afc..b528f75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.operators;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -32,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
@@ -45,7 +43,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
@@ -67,6 +64,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -567,15 +565,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* Each chained task might have accumulators which will be merged
* with the accumulators of the stub.
*/
- protected static void reportAndClearAccumulators(
- Environment env, Map<String, Accumulator<?, ?>> accumulators, ArrayList<ChainedDriver<?, ?>> chainedTasks) {
+ protected static void reportAndClearAccumulators(Environment env,
+ Map<String, Accumulator<?, ?>> accumulators,
+ ArrayList<ChainedDriver<?, ?>> chainedTasks) {
// We can merge here the accumulators from the stub and the chained
// tasks. Type conflicts can occur here if counters with same name but
// different type were used.
for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
- Map<String, Accumulator<?, ?>> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
+ Map<String, Accumulator<?, ?>> chainedAccumulators =
+ FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
}
}
@@ -586,9 +586,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
// Report accumulators to JobManager
- JobManagerMessages.ReportAccumulatorResult accResult = new JobManagerMessages.ReportAccumulatorResult(new
- AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators)));
- env.getJobManager().tell(accResult, ActorRef.noSender());
+ env.reportAccumulators(accumulators);
// We also clear the accumulators, since stub instances might be reused
// (e.g. in iterations) and we don't want to count twice. This may not be