You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/26 11:51:56 UTC

[3/3] flink git commit: [runtime] Refactor partition consumable notification

[runtime] Refactor partition consumable notification

Moves the partition consumable notification out of ResultPartition. This makes
it easier to setup unit tests, where no job manager is available to notify.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95bdaad3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95bdaad3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95bdaad3

Branch: refs/heads/master
Commit: 95bdaad3b78af9414f2c8bd7e0eae2eafcab055b
Parents: b13c9a7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 23 14:59:28 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Mar 26 11:51:26 2015 +0100

----------------------------------------------------------------------
 .../runtime/execution/RuntimeEnvironment.java   | 10 ++-
 .../runtime/io/network/NetworkEnvironment.java  | 67 ++++++++++++++++++--
 .../io/network/partition/ResultPartition.java   | 45 ++++---------
 .../ResultPartitionConsumableNotifier.java      | 27 ++++++++
 4 files changed, 112 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 96b2f55..c6270b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -125,7 +125,15 @@ public class RuntimeEnvironment implements Environment, Runnable {
 				ResultPartitionDeploymentDescriptor desc = partitions.get(i);
 				ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
 
-				this.producedPartitions[i] = new ResultPartition(owner.getJobID(), partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), networkEnvironment, ioManager);
+				this.producedPartitions[i] = new ResultPartition(
+						owner.getJobID(),
+						partitionId,
+						desc.getPartitionType(),
+						desc.getNumberOfSubpartitions(),
+						networkEnvironment.getPartitionManager(),
+						networkEnvironment.getPartitionConsumableNotifier(),
+						ioManager,
+						networkEnvironment.getDefaultIOMode());
 
 				writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index e02e744..6a1e8a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.io.network;
 
 import akka.actor.ActorRef;
+import akka.dispatch.OnFailure;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -27,20 +30,26 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
 
 /**
  * Network I/O components of each {@link TaskManager} instance.
@@ -63,6 +72,8 @@ public class NetworkEnvironment {
 
 	private final ConnectionManager connectionManager;
 
+	private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+
 	private final NetworkEnvironmentConfiguration configuration;
 
 	private boolean isShutdown;
@@ -70,9 +81,12 @@ public class NetworkEnvironment {
 	/**
 	 * Initializes all network I/O components.
 	 */
-	public NetworkEnvironment(ActorRef taskManager, ActorRef jobManager,
-							FiniteDuration jobManagerTimeout,
-							NetworkEnvironmentConfiguration config) throws IOException {
+	public NetworkEnvironment(
+			ActorRef taskManager,
+			ActorRef jobManager,
+			FiniteDuration jobManagerTimeout,
+			NetworkEnvironmentConfiguration config) throws IOException {
+
 		this.taskManager = checkNotNull(taskManager);
 		this.jobManager = checkNotNull(jobManager);
 		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
@@ -104,6 +118,8 @@ public class NetworkEnvironment {
 		catch (Throwable t) {
 			throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
 		}
+
+		this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this);
 	}
 
 	public ActorRef getTaskManager() {
@@ -183,7 +199,7 @@ public class NetworkEnvironment {
 
 	public void unregisterTask(Task task) {
 		LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
-					task.getTaskNameWithSubtasks(), task.getExecutionState());
+				task.getTaskNameWithSubtasks(), task.getExecutionState());
 
 		final ExecutionAttemptID executionId = task.getExecutionId();
 
@@ -235,6 +251,10 @@ public class NetworkEnvironment {
 		return configuration.ioMode();
 	}
 
+	public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
+		return partitionConsumableNotifier;
+	}
+
 	public boolean hasReleasedAllResources() {
 		String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
 				networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());
@@ -296,4 +316,43 @@ public class NetworkEnvironment {
 	public boolean isShutdown() {
 		return isShutdown;
 	}
+
+	/**
+	 * Notifies the job manager about consumable partitions.
+	 */
+	private static class JobManagerResultPartitionConsumableNotifier
+			implements ResultPartitionConsumableNotifier {
+
+		private final NetworkEnvironment networkEnvironment;
+
+		public JobManagerResultPartitionConsumableNotifier(NetworkEnvironment networkEnvironment) {
+			this.networkEnvironment = networkEnvironment;
+		}
+
+		@Override
+		public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
+
+			final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
+
+			Future<Object> futureResponse = Patterns.ask(
+					networkEnvironment.getJobManager(),
+					msg,
+					networkEnvironment.getJobManagerTimeout());
+
+			futureResponse.onFailure(new OnFailure() {
+				@Override
+				public void onFailure(Throwable failure) throws Throwable {
+					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
+
+					// Fail task at the TaskManager
+					FailTask failMsg = new FailTask(
+							partitionId.getProducerId(),
+							new RuntimeException("Could not schedule or update consumers at " +
+									"the JobManager.", failure));
+
+					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+				}
+			}, AkkaUtils.globalExecutionContext());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 95aa636..ddd47dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,14 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import akka.actor.ActorRef;
-import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
 import com.google.common.base.Optional;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -37,7 +33,6 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,8 +41,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
 
 /**
  * A result partition for data produced by a single task.
@@ -94,7 +87,9 @@ public class ResultPartition implements BufferPoolOwner {
 	/** The subpartitions of this partition. At least one. */
 	private final ResultSubpartition[] subpartitions;
 
-	private final NetworkEnvironment networkEnvironment;
+	private final ResultPartitionManager partitionManager;
+
+	private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
 	// - Runtime state --------------------------------------------------------
 
@@ -126,21 +121,24 @@ public class ResultPartition implements BufferPoolOwner {
 			ResultPartitionID partitionId,
 			ResultPartitionType partitionType,
 			int numberOfSubpartitions,
-			NetworkEnvironment networkEnvironment,
-			IOManager ioManager) {
+			ResultPartitionManager partitionManager,
+			ResultPartitionConsumableNotifier partitionConsumableNotifier,
+			IOManager ioManager,
+			IOMode defaultIoMode) {
 
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
 		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
-		this.networkEnvironment = checkNotNull(networkEnvironment);
+		this.partitionManager = checkNotNull(partitionManager);
+		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
 
 		// Create the subpartitions.
 		switch (partitionType) {
 			case BLOCKING:
 				for (int i = 0; i < subpartitions.length; i++) {
 					subpartitions[i] = new SpillableSubpartition(
-							i, this, ioManager, networkEnvironment.getDefaultIOMode());
+							i, this, ioManager, defaultIoMode);
 				}
 
 				break;
@@ -375,7 +373,7 @@ public class ResultPartition implements BufferPoolOwner {
 		int refCnt = pendingReferences.decrementAndGet();
 
 		if (refCnt == 0) {
-			networkEnvironment.getPartitionManager().onConsumedPartition(this);
+			partitionManager.onConsumedPartition(this);
 		}
 		else if (refCnt < 0) {
 			throw new IllegalStateException("All references released.");
@@ -396,24 +394,7 @@ public class ResultPartition implements BufferPoolOwner {
 	 */
 	private void notifyPipelinedConsumers() throws IOException {
 		if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
-			ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
-
-			Future<Object> futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg,
-					networkEnvironment.getJobManagerTimeout());
-
-			futureResponse.onFailure(new OnFailure() {
-				@Override
-				public void onFailure(Throwable failure) throws Throwable {
-					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
-
-					// Fail task at the TaskManager
-					FailTask failMsg = new FailTask(partitionId.getProducerId(),
-							new RuntimeException("Could not schedule or update consumers at " +
-									"the JobManager.", failure));
-
-					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
-				}
-			}, AkkaUtils.globalExecutionContext());
+			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
 
 			hasNotifiedPipelinedConsumers = true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..0ea3a1c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+
+public interface ResultPartitionConsumableNotifier {
+
+	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId);
+
+}