You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/10 14:04:46 UTC

[4/4] flink git commit: [runtime] Accumulators are reported through the RuntimeEnvironment, not directly sent as an actor message

[runtime] Accumulators are reported through the RuntimeEnvironment, not directly sent as an actor message


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/201bea3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/201bea3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/201bea3f

Branch: refs/heads/master
Commit: 201bea3f20bf4d96123bbb562db834297d8e9ec4
Parents: 9ffcdf3
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 8 17:11:44 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 10 12:58:39 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/execution/Environment.java    | 38 ++++++++++++++++----
 .../runtime/execution/RuntimeEnvironment.java   | 19 ++++++++++
 .../runtime/operators/RegularPactTask.java      | 16 ++++-----
 3 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 503a0b9..1bd4f7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.execution;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -80,7 +81,8 @@ public interface Environment {
 	int getNumberOfSubtasks();
 
 	/**
-	 * Returns the index of this subtask in the subtask group.
+	 * Returns the index of this subtask in the subtask group. The index
+	 * is between 0 and {@link #getNumberOfSubtasks()} - 1.
 	 *
 	 * @return the index of this subtask in the subtask group
 	 */
@@ -89,7 +91,8 @@ public interface Environment {
 	/**
 	 * Returns the input split provider assigned to this environment.
 	 *
-	 * @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.
+	 * @return The input split provider or {@code null} if no such
+	 *         provider has been assigned to this environment.
 	 */
 	InputSplitProvider getInputSplitProvider();
 
@@ -114,12 +117,15 @@ public interface Environment {
 	 */
 	String getTaskName();
 
-	String getTaskNameWithSubtasks();
-
 	/**
-	 * Returns the proxy object for the accumulator protocol.
+	 * Returns the name of the task running in this environment, appended
+	 * with the subtask indicator, such as "MyTask (3/6)", where
+	 * 3 would be ({@link #getIndexInSubtaskGroup()} + 1), and 6 would be
+	 * {@link #getNumberOfSubtasks()}.
+	 *
+	 * @return The name of the task running in this environment, with subtask indicator.
 	 */
-	ActorRef getJobManager();
+	String getTaskNameWithSubtasks();
 
 	/**
 	 * Returns the user code class loader
@@ -130,6 +136,17 @@ public interface Environment {
 
 	BroadcastVariableManager getBroadcastVariableManager();
 
+	/**
+	 * Reports the given set of accumulators to the JobManager.
+	 *
+	 * @param accumulators The accumulators to report.
+	 */
+	void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
+
+	// --------------------------------------------------------------------------------------------
+	//  Fields relevant to the I/O system. Should go into Task
+	// --------------------------------------------------------------------------------------------
+
 	ResultPartitionWriter getWriter(int index);
 
 	ResultPartitionWriter[] getAllWriters();
@@ -138,4 +155,13 @@ public interface Environment {
 
 	InputGate[] getAllInputGates();
 
+
+	/**
+	 * Returns the proxy object for the accumulator protocol.
+	 */
+	// THIS DOES NOT BELONG HERE, THIS TOTALLY BREAKS COMPONENTIZATION.
+	// THE EXECUTED TASKS HAVE BEEN KEPT INDEPENDENT OF ANY RPC OR ACTOR
+	// COMMUNICATION !!!
+	ActorRef getJobManager();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 3fb7493..5416f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.execution;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorEvent;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -38,10 +41,12 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -352,6 +357,20 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	}
 
 	@Override
+	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
+		AccumulatorEvent evt;
+		try {
+			evt = new AccumulatorEvent(getJobID(), accumulators);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
+		}
+
+		ReportAccumulatorResult accResult = new ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt);
+		jobManager.tell(accResult, ActorRef.noSender());
+	}
+
+	@Override
 	public ResultPartitionWriter getWriter(int index) {
 		checkElementIndex(index, writers.length, "Illegal environment writer request.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 71b8afc..b528f75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.operators;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -32,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -45,7 +43,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
@@ -67,6 +64,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -567,15 +565,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 *          Each chained task might have accumulators which will be merged
 	 *          with the accumulators of the stub.
 	 */
-	protected static void reportAndClearAccumulators(
-			Environment env, Map<String, Accumulator<?, ?>> accumulators, ArrayList<ChainedDriver<?, ?>> chainedTasks) {
+	protected static void reportAndClearAccumulators(Environment env,
+													Map<String, Accumulator<?, ?>> accumulators,
+													ArrayList<ChainedDriver<?, ?>> chainedTasks) {
 
 		// We can merge here the accumulators from the stub and the chained
 		// tasks. Type conflicts can occur here if counters with same name but
 		// different type were used.
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
 			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-				Map<String, Accumulator<?, ?>> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
+				Map<String, Accumulator<?, ?>> chainedAccumulators =
+						FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
 				AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
 			}
 		}
@@ -586,9 +586,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 
 		// Report accumulators to JobManager
-		JobManagerMessages.ReportAccumulatorResult accResult = new JobManagerMessages.ReportAccumulatorResult(new
-				AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators)));
-		env.getJobManager().tell(accResult, ActorRef.noSender());
+		env.reportAccumulators(accumulators);
 
 		// We also clear the accumulators, since stub instances might be reused
 		// (e.g. in iterations) and we don't want to count twice. This may not be