You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/26 11:51:56 UTC
[3/3] flink git commit: [runtime] Refactor partition consumable
notification
[runtime] Refactor partition consumable notification
Moves the partition consumable notification out of ResultPartition. This makes
it easier to setup unit tests, where no job manager is available to notify.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95bdaad3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95bdaad3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95bdaad3
Branch: refs/heads/master
Commit: 95bdaad3b78af9414f2c8bd7e0eae2eafcab055b
Parents: b13c9a7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 23 14:59:28 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Mar 26 11:51:26 2015 +0100
----------------------------------------------------------------------
.../runtime/execution/RuntimeEnvironment.java | 10 ++-
.../runtime/io/network/NetworkEnvironment.java | 67 ++++++++++++++++++--
.../io/network/partition/ResultPartition.java | 45 ++++---------
.../ResultPartitionConsumableNotifier.java | 27 ++++++++
4 files changed, 112 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 96b2f55..c6270b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -125,7 +125,15 @@ public class RuntimeEnvironment implements Environment, Runnable {
ResultPartitionDeploymentDescriptor desc = partitions.get(i);
ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
- this.producedPartitions[i] = new ResultPartition(owner.getJobID(), partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), networkEnvironment, ioManager);
+ this.producedPartitions[i] = new ResultPartition(
+ owner.getJobID(),
+ partitionId,
+ desc.getPartitionType(),
+ desc.getNumberOfSubpartitions(),
+ networkEnvironment.getPartitionManager(),
+ networkEnvironment.getPartitionConsumableNotifier(),
+ ioManager,
+ networkEnvironment.getDefaultIOMode());
writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index e02e744..6a1e8a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,7 +19,10 @@
package org.apache.flink.runtime.io.network;
import akka.actor.ActorRef;
+import akka.dispatch.OnFailure;
+import akka.pattern.Patterns;
import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -27,20 +30,26 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
/**
* Network I/O components of each {@link TaskManager} instance.
@@ -63,6 +72,8 @@ public class NetworkEnvironment {
private final ConnectionManager connectionManager;
+ private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+
private final NetworkEnvironmentConfiguration configuration;
private boolean isShutdown;
@@ -70,9 +81,12 @@ public class NetworkEnvironment {
/**
* Initializes all network I/O components.
*/
- public NetworkEnvironment(ActorRef taskManager, ActorRef jobManager,
- FiniteDuration jobManagerTimeout,
- NetworkEnvironmentConfiguration config) throws IOException {
+ public NetworkEnvironment(
+ ActorRef taskManager,
+ ActorRef jobManager,
+ FiniteDuration jobManagerTimeout,
+ NetworkEnvironmentConfiguration config) throws IOException {
+
this.taskManager = checkNotNull(taskManager);
this.jobManager = checkNotNull(jobManager);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
@@ -104,6 +118,8 @@ public class NetworkEnvironment {
catch (Throwable t) {
throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}
+
+ this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this);
}
public ActorRef getTaskManager() {
@@ -183,7 +199,7 @@ public class NetworkEnvironment {
public void unregisterTask(Task task) {
LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
- task.getTaskNameWithSubtasks(), task.getExecutionState());
+ task.getTaskNameWithSubtasks(), task.getExecutionState());
final ExecutionAttemptID executionId = task.getExecutionId();
@@ -235,6 +251,10 @@ public class NetworkEnvironment {
return configuration.ioMode();
}
+ public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
+ return partitionConsumableNotifier;
+ }
+
public boolean hasReleasedAllResources() {
String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());
@@ -296,4 +316,43 @@ public class NetworkEnvironment {
public boolean isShutdown() {
return isShutdown;
}
+
+ /**
+ * Notifies the job manager about consumable partitions.
+ */
+ private static class JobManagerResultPartitionConsumableNotifier
+ implements ResultPartitionConsumableNotifier {
+
+ private final NetworkEnvironment networkEnvironment;
+
+ public JobManagerResultPartitionConsumableNotifier(NetworkEnvironment networkEnvironment) {
+ this.networkEnvironment = networkEnvironment;
+ }
+
+ @Override
+ public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
+
+ final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
+
+ Future<Object> futureResponse = Patterns.ask(
+ networkEnvironment.getJobManager(),
+ msg,
+ networkEnvironment.getJobManagerTimeout());
+
+ futureResponse.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) throws Throwable {
+ LOG.error("Could not schedule or update consumers at the JobManager.", failure);
+
+ // Fail task at the TaskManager
+ FailTask failMsg = new FailTask(
+ partitionId.getProducerId(),
+ new RuntimeException("Could not schedule or update consumers at " +
+ "the JobManager.", failure));
+
+ networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+ }
+ }, AkkaUtils.globalExecutionContext());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 95aa636..ddd47dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,14 +18,10 @@
package org.apache.flink.runtime.io.network.partition;
-import akka.actor.ActorRef;
-import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
import com.google.common.base.Optional;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -37,7 +33,6 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,8 +41,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
/**
* A result partition for data produced by a single task.
@@ -94,7 +87,9 @@ public class ResultPartition implements BufferPoolOwner {
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
- private final NetworkEnvironment networkEnvironment;
+ private final ResultPartitionManager partitionManager;
+
+ private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
// - Runtime state --------------------------------------------------------
@@ -126,21 +121,24 @@ public class ResultPartition implements BufferPoolOwner {
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
- NetworkEnvironment networkEnvironment,
- IOManager ioManager) {
+ ResultPartitionManager partitionManager,
+ ResultPartitionConsumableNotifier partitionConsumableNotifier,
+ IOManager ioManager,
+ IOMode defaultIoMode) {
this.jobId = checkNotNull(jobId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
- this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.partitionManager = checkNotNull(partitionManager);
+ this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
// Create the subpartitions.
switch (partitionType) {
case BLOCKING:
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new SpillableSubpartition(
- i, this, ioManager, networkEnvironment.getDefaultIOMode());
+ i, this, ioManager, defaultIoMode);
}
break;
@@ -375,7 +373,7 @@ public class ResultPartition implements BufferPoolOwner {
int refCnt = pendingReferences.decrementAndGet();
if (refCnt == 0) {
- networkEnvironment.getPartitionManager().onConsumedPartition(this);
+ partitionManager.onConsumedPartition(this);
}
else if (refCnt < 0) {
throw new IllegalStateException("All references released.");
@@ -396,24 +394,7 @@ public class ResultPartition implements BufferPoolOwner {
*/
private void notifyPipelinedConsumers() throws IOException {
if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
- ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
-
- Future<Object> futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg,
- networkEnvironment.getJobManagerTimeout());
-
- futureResponse.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws Throwable {
- LOG.error("Could not schedule or update consumers at the JobManager.", failure);
-
- // Fail task at the TaskManager
- FailTask failMsg = new FailTask(partitionId.getProducerId(),
- new RuntimeException("Could not schedule or update consumers at " +
- "the JobManager.", failure));
-
- networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
- }
- }, AkkaUtils.globalExecutionContext());
+ partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
hasNotifiedPipelinedConsumers = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/95bdaad3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..0ea3a1c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+
+public interface ResultPartitionConsumableNotifier {
+
+ void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId);
+
+}