You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:55 UTC
[06/13] flink git commit: [FLINK-1350] [runtime] Add blocking result
partition variant
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index e69ef17..c0be611 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -300,7 +300,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
/**
* The reader for the spilled-file of the probe partition that is currently read.
*/
- private BlockChannelReader currentSpilledProbeSide;
+ private BlockChannelReader<MemorySegment> currentSpilledProbeSide;
/**
* The channel enumerator that is used while processing the current partition to create
@@ -802,7 +802,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
segments.add(getNextBuffer());
segments.add(getNextBuffer());
- final BlockChannelReader inReader = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
+ final BlockChannelReader<MemorySegment> inReader = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
final ChannelReaderInputView inView = new HeaderlessChannelReaderInputView(inReader, segments,
p.getBuildSideBlockCount(), p.getLastSegmentLimit(), false);
final ChannelReaderInputViewIterator<BT> inIter = new ChannelReaderInputViewIterator<BT>(inView,
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 56dcfae..84868ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -37,7 +37,7 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
private FileIOChannel.ID initialBuildSideChannel = null; // path to initial build side contents (only for in-memory partitions)
- private BlockChannelWriter initialBuildSideWriter = null;
+ private BlockChannelWriter<MemorySegment> initialBuildSideWriter = null;
private boolean isRestored = false; // marks a restored partition
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 3466024..d05bd9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -301,7 +301,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
}
// create writer
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
@@ -457,7 +457,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// create a new channel writer
final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
registerChannelToBeRemovedAtShudown(mergedChannelID);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers,
this.memManager.getPageSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index a938e98..388b7b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1315,7 +1315,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
registerChannelToBeRemovedAtShudown(channel);
// create writer
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
@@ -1493,7 +1493,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
final List<MemorySegment> segsForChannel = inputSegments.get(i);
// create a reader. if there are multiple segments for the reader, issue multiple together per I/O request
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel.getChannel());
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel.getChannel());
readerList.add(reader);
registerOpenChannelToBeRemovedAtShudown(reader);
@@ -1578,7 +1578,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// create a new channel writer
final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
registerChannelToBeRemovedAtShudown(mergedChannelID);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers,
this.memManager.getPageSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e3e4175..8cebc6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -23,14 +23,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -73,6 +74,8 @@ public class Task {
/** The current execution state of the task */
private volatile ExecutionState executionState = ExecutionState.DEPLOYING;
+ private volatile Throwable failureCause;
+
// --------------------------------------------------------------------------------------------
public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int parallelism,
@@ -160,6 +163,10 @@ public class Task {
}
}
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+
// ----------------------------------------------------------------------------------------------------------------
// States and Transitions
// ----------------------------------------------------------------------------------------------------------------
@@ -174,7 +181,7 @@ public class Task {
public boolean markAsFinished() {
if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
notifyObservers(ExecutionState.FINISHED, null);
- notifyExecutionStateChange(ExecutionState.FINISHED, null);
+ unregisterTask();
return true;
}
else {
@@ -195,8 +202,11 @@ public class Task {
// after all, we may have recognized our failure state before the cancelling and never sent a canceled
// message back
else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ this.failureCause = error;
+
notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
- notifyExecutionStateChange(ExecutionState.FAILED, error);
+ unregisterTask();
+
return;
}
}
@@ -218,7 +228,7 @@ public class Task {
if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
notifyObservers(ExecutionState.CANCELED, null);
- notifyExecutionStateChange(ExecutionState.CANCELED, null);
+ unregisterTask();
return;
}
}
@@ -268,9 +278,10 @@ public class Task {
if (current == ExecutionState.DEPLOYING) {
// directly set to canceled
if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ this.failureCause = cause;
notifyObservers(ExecutionState.FAILED, null);
- notifyExecutionStateChange(ExecutionState.FAILED, cause);
+ unregisterTask();
return;
}
}
@@ -284,8 +295,10 @@ public class Task {
LOG.error("Error while cancelling the task.", e);
}
+ this.failureCause = cause;
+
notifyObservers(ExecutionState.FAILED, null);
- notifyExecutionStateChange(ExecutionState.FAILED, cause);
+ unregisterTask();
return;
}
@@ -309,7 +322,7 @@ public class Task {
if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
notifyObservers(ExecutionState.CANCELED, null);
- notifyExecutionStateChange(ExecutionState.CANCELED, null);
+ unregisterTask();
return;
}
}
@@ -339,6 +352,10 @@ public class Task {
}
}
+ protected void unregisterTask() {
+ taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
+ }
+
protected void notifyExecutionStateChange(ExecutionState executionState,
Throwable optionalError) {
LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
@@ -377,11 +394,11 @@ public class Task {
return environment != null ? environment.getAllInputGates() : null;
}
- public BufferWriter[] getWriters() {
+ public ResultPartitionWriter[] getWriters() {
return environment != null ? environment.getAllWriters() : null;
}
- public IntermediateResultPartition[] getProducedPartitions() {
+ public ResultPartition[] getProducedPartitions() {
return environment != null ? environment.getProducedPartitions() : null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
index ee5f281..3385ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
@@ -19,49 +19,61 @@
package org.apache.flink.runtime.util;
/**
- * Atomic reference counter, which enters a "disposed" state after the reference
- * count reaches 0.
+ * Atomic reference counter, which enters a "disposed" state after it reaches a configurable
+ * reference count (default 0).
*/
public class AtomicDisposableReferenceCounter {
private final Object lock = new Object();
- private int referenceCounter;
+ private int referenceCount;
private boolean isDisposed;
+ /** Enter the disposed state when the reference count reaches this number. */
+ private final int disposeOnReferenceCount;
+
+ public AtomicDisposableReferenceCounter() {
+ this.disposeOnReferenceCount = 0;
+ }
+
+ public AtomicDisposableReferenceCounter(int disposeOnReferenceCount) {
+ this.disposeOnReferenceCount = disposeOnReferenceCount;
+ }
+
/**
* Increments the reference count and returns whether it was successful.
* <p>
- * If the method returns <code>false</code>, the counter has already been
- * disposed. Otherwise it returns <code>true</code>.
+ * If the method returns <code>false</code>, the counter has already been disposed. Otherwise it
+ * returns <code>true</code>.
*/
- public boolean incrementReferenceCounter() {
+ public boolean increment() {
synchronized (lock) {
if (isDisposed) {
return false;
}
- referenceCounter++;
+ referenceCount++;
return true;
}
}
/**
- * Decrements the reference count.
+ * Decrements the reference count and returns whether the reference counter entered the disposed
+ * state.
* <p>
- * If the method returns <code>true</code>, the decrement operation disposed
- * the counter. Otherwise it returns <code>false</code>.
+ * If the method returns <code>true</code>, the decrement operation disposed the counter.
+ * Otherwise it returns <code>false</code>.
*/
- public boolean decrementReferenceCounter() {
+ public boolean decrement() {
synchronized (lock) {
if (isDisposed) {
return false;
}
- referenceCounter--;
+ referenceCount--;
- if (referenceCounter <= 0) {
+ if (referenceCount <= disposeOnReferenceCount) {
isDisposed = true;
}
@@ -69,9 +81,24 @@ public class AtomicDisposableReferenceCounter {
}
}
+ public int get() {
+ synchronized (lock) {
+ return referenceCount;
+ }
+ }
+
+ /**
+ * Returns whether the reference count has reached the disposed state.
+ */
+ public boolean isDisposed() {
+ synchronized (lock) {
+ return isDisposed;
+ }
+ }
+
public boolean disposeIfNotUsed() {
synchronized (lock) {
- if(referenceCounter <= 0){
+ if (referenceCount <= disposeOnReferenceCount) {
isDisposed = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 98dd7eb..fe66b37 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{ScheduleMode,JobGraph,JobStatus,JobID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID}
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -97,7 +97,7 @@ class JobManager(val configuration: Configuration,
val delayBetweenRetries: Long,
val timeout: FiniteDuration)
extends Actor with ActorLogMessages with ActorLogging {
-
+
/** Reference to the log, for debugging */
val LOG = JobManager.LOG
@@ -283,20 +283,20 @@ class JobManager(val configuration: Configuration,
if(newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
- // is the client waiting for the job result?
+ // is the client waiting for the job result?
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID)
- jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults)
+ jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults)
case JobStatus.CANCELED =>
jobInfo.client ! Failure(new JobCancellationException(jobID,
- "Job was cancelled.",error))
+ "Job was cancelled.", error))
case JobStatus.FAILED =>
jobInfo.client ! Failure(new JobExecutionException(jobID,
- "Job execution failed.",error))
+ "Job execution failed.", error))
case x =>
- val exception = new JobExecutionException(jobID,s"$x is not a " +
- "terminal state.")
+ val exception = new JobExecutionException(jobID, s"$x is not a " +
+ "terminal state.")
jobInfo.client ! Failure(exception)
throw exception
}
@@ -321,11 +321,11 @@ class JobManager(val configuration: Configuration,
case None =>
}
- case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
+case ScheduleOrUpdateConsumers(jobId, partitionId) =>
currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
sender ! Acknowledge
- executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
+ executionGraph.scheduleOrUpdateConsumers(partitionId)
case None =>
log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
jobId)
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4630089..17e9138 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorEvent
import org.apache.flink.runtime.client.JobStatusMessage
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{InstanceID, Instance}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, JobVertexID}
import org.apache.flink.runtime.taskmanager.TaskExecutionState
@@ -76,18 +77,16 @@ object JobManagerMessages {
* <p>
* There is a call to this method for each
* [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] instance once per produced
- * [[org.apache.flink.runtime.io.network.partition.IntermediateResultPartition]] instance,
+ * [[org.apache.flink.runtime.io.network.partition.ResultPartition]] instance,
* either when first producing data (for pipelined executions) or when all data has been produced
* (for staged executions).
* <p>
* The [[org.apache.flink.runtime.jobmanager.JobManager]] then can decide when to schedule the
* partition consumers of the given session.
*
- * @see [[org.apache.flink.runtime.io.network.partition.IntermediateResultPartition]]
+ * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
*/
- case class ScheduleOrUpdateConsumers(jobId: JobID,
- executionId: ExecutionAttemptID,
- partitionIndex: Int)
+ case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId: ResultPartitionID)
case class ConsumerNotificationResult(success: Boolean, error: Option[Throwable] = None)
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b7bb060..4e6144a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.messages
import org.apache.flink.core.io.InputSplit
-import org.apache.flink.runtime.deployment.{PartitionInfo, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.instance.InstanceID
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
@@ -65,17 +65,21 @@ object TaskManagerMessages {
def executionID: ExecutionAttemptID
}
- case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
- resultId: IntermediateDataSetID,
- partitionInfo: PartitionInfo) extends UpdateTask
-
- case class UpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID,
- PartitionInfo)]) extends UpdateTask
-
- def createUpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
- resultIDs: java.util.List[IntermediateDataSetID],
- partitionInfos: java.util.List[PartitionInfo]):
+ case class UpdateTaskSinglePartitionInfo(
+ executionID: ExecutionAttemptID,
+ resultId: IntermediateDataSetID,
+ partitionInfo: InputChannelDeploymentDescriptor)
+ extends UpdateTask
+
+ case class UpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+ extends UpdateTask
+
+ def createUpdateTaskMultiplePartitionInfos(
+ executionID: ExecutionAttemptID,
+ resultIDs: java.util.List[IntermediateDataSetID],
+ partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
UpdateTaskMultiplePartitionInfos = {
require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as" +
"partitionInfos.")
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index aeda6c4..f99aac0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.taskmanager
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.network.netty.NettyConfig
case class NetworkEnvironmentConfiguration(numNetworkBuffers: Int,
networkBufferSize: Int,
+ ioMode: IOMode,
nettyConfig: Option[NettyConfig] = None)
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 53c45ce..61cab6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -33,13 +33,14 @@ import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.BlobCache
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
-import org.apache.flink.runtime.deployment.{PartitionInfo, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.execution.{CancelTaskException, ExecutionState, RuntimeEnvironment}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
+import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
@@ -261,7 +262,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}
case UnregisterTask(executionID) =>
- unregisterTask(executionID)
+ unregisterTaskAndNotifyFinalState(executionID)
case updateMsg:UpdateTaskExecutionState =>
currentJobManager foreach {
@@ -348,7 +349,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
case FailIntermediateResultPartitions(executionID) =>
log.info("Fail intermediate result partitions associated with execution {}.", executionID)
networkEnvironment foreach {
- _.getPartitionManager.failIntermediateResultPartitions(executionID)
+ _.getPartitionManager.releasePartitionsProducedBy(executionID)
}
case BarrierReq(attemptID, checkpointID) =>
@@ -549,8 +550,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
registrationDuration = 0 seconds
}
- private def updateTask(executionId: ExecutionAttemptID,
- partitionInfos: Seq[(IntermediateDataSetID, PartitionInfo)]): Unit = {
+ private def updateTask(
+ executionId: ExecutionAttemptID,
+ partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]): Unit = {
runningTasks.get(executionId) match {
case Some(task) =>
@@ -685,17 +687,24 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
for (t <- runningTasks.values) {
t.failExternally(cause)
- unregisterTask(t.getExecutionId)
+ unregisterTaskAndNotifyFinalState(t.getExecutionId)
}
}
}
- private def unregisterTask(executionID: ExecutionAttemptID): Unit = {
+ private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {
runningTasks.remove(executionID) match {
case Some(task) =>
log.info("Unregister task with execution ID {}.", executionID)
removeAllTaskResources(task)
libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
+
+ log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
+ task.getExecutionId, task.getExecutionState);
+
+ self ! UpdateTaskExecutionState(new TaskExecutionState(
+ task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
+
case None =>
if (log.isDebugEnabled) {
log.debug("Cannot find task with ID {} to unregister.", executionID)
@@ -1194,7 +1203,19 @@ object TaskManager {
connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
}
- val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig)
+ // Default spill I/O mode for intermediate results
+ val syncOrAsync = configuration.getString(ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
+
+ val ioMode : IOMode = if (syncOrAsync == "async") {
+ IOMode.ASYNC
+ }
+ else {
+ IOMode.SYNC
+ }
+
+ val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, ioMode,
+ nettyConfig)
val networkBufferMem = numNetworkBuffers * pageSize
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 1a274d0..5742fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -48,8 +48,8 @@ public class TaskDeploymentDescriptorTest {
final Configuration jobConfiguration = new Configuration();
final Configuration taskConfiguration = new Configuration();
final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
- final List<PartitionDeploymentDescriptor> producedResults = new ArrayList<PartitionDeploymentDescriptor>(0);
- final List<PartitionConsumerDeploymentDescriptor> inputGates = new ArrayList<PartitionConsumerDeploymentDescriptor>(0);
+ final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+ final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
@@ -70,7 +70,7 @@ public class TaskDeploymentDescriptorTest {
assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
assertEquals(orig.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
- assertEquals(orig.getConsumedPartitions(), copy.getConsumedPartitions());
+ assertEquals(orig.getInputGates(), copy.getInputGates());
assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 526ba7f..c979c42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,8 +38,8 @@ import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
@@ -138,15 +138,15 @@ public class ExecutionGraphDeploymentTest {
assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName());
assertEquals("v2", descr.getTaskName());
- List<PartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
- List<PartitionConsumerDeploymentDescriptor> consumedPartitions = descr.getConsumedPartitions();
+ List<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
+ List<InputGateDeploymentDescriptor> consumedPartitions = descr.getInputGates();
assertEquals(2, producedPartitions.size());
assertEquals(1, consumedPartitions.size());
- assertEquals(10, producedPartitions.get(0).getNumberOfQueues());
- assertEquals(10, producedPartitions.get(1).getNumberOfQueues());
- assertEquals(10, consumedPartitions.get(0).getPartitions().length);
+ assertEquals(10, producedPartitions.get(0).getNumberOfSubpartitions());
+ assertEquals(10, producedPartitions.get(1).getNumberOfSubpartitions());
+ assertEquals(10, consumedPartitions.get(0).getInputChannelDeploymentDescriptors().length);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index c05fcca..0462b3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -107,7 +107,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -120,7 +120,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
generator.reset();
@@ -151,7 +151,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -164,7 +164,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
generator.reset();
@@ -192,7 +192,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -205,7 +205,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
generator.reset();
@@ -243,7 +243,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -256,7 +256,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, true);
generator.reset();
@@ -287,7 +287,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -300,7 +300,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, 1);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
generator.reset();
@@ -331,7 +331,7 @@ public class ChannelViewsTest
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -344,7 +344,7 @@ public class ChannelViewsTest
// create the reader input view
memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel);
final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
generator.reset();
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 27928a9..85d2113 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -88,7 +88,7 @@ public class FileChannelStreamsITCase {
final FileIOChannel.ID channel = ioManager.createChannel();
// create the writer output view
- final BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -102,7 +102,7 @@ public class FileChannelStreamsITCase {
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
@@ -132,7 +132,7 @@ public class FileChannelStreamsITCase {
final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -146,7 +146,7 @@ public class FileChannelStreamsITCase {
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
@@ -176,7 +176,7 @@ public class FileChannelStreamsITCase {
final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -190,7 +190,7 @@ public class FileChannelStreamsITCase {
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
@@ -226,7 +226,7 @@ public class FileChannelStreamsITCase {
final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -240,7 +240,7 @@ public class FileChannelStreamsITCase {
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), 1);
- final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
@@ -270,7 +270,7 @@ public class FileChannelStreamsITCase {
final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+ final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
@@ -284,7 +284,7 @@ public class FileChannelStreamsITCase {
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
- final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1db2a6f..1f6899d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -49,7 +49,7 @@ public class FileChannelStreamsTest {
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
- BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+ BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
new StringValue("Some test text").write(out);
@@ -91,7 +91,7 @@ public class FileChannelStreamsTest {
wrt.close();
}
- BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+ BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
// read just something
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index 7e4d70d..f090ef1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -50,7 +50,7 @@ public class SeekableFileChannelInputViewTest {
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
- BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+ BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
// write some integers across 7.5 pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
new file mode 100644
index 0000000..0397de5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class AsynchronousBufferFileWriterTest {
+
+ private static final IOManager ioManager = new IOManagerAsync();
+
+ private static final Buffer mockBuffer = mock(Buffer.class);
+
+ private AsynchronousBufferFileWriter writer;
+
+ @Before
+ public void setUp() throws IOException {
+ writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<WriteRequest>());
+ }
+
+ @Test
+ public void testAddAndHandleRequest() throws Exception {
+ addRequest();
+ assertEquals("Didn't increment number of outstanding requests.", 1, writer.getNumberOfOutstandingRequests());
+
+ handleRequest();
+ assertEquals("Didn't decrement number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());
+ }
+
+ @Test
+ public void testSubscribe() throws Exception {
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ // Unsuccessful subscription, because no outstanding requests
+ assertFalse("Allowed to subscribe w/o any outstanding requests.", writer.registerAllRequestsProcessedListener(listener));
+
+ // Successful subscription
+ addRequest();
+ assertTrue("Didn't allow to subscribe.", writer.registerAllRequestsProcessedListener(listener));
+
+ // Test notification
+ handleRequest();
+
+ assertEquals("Listener was not notified.", 1, listener.getNumberOfNotifications());
+ }
+
+ @Test
+ public void testSubscribeAndClose() throws IOException, InterruptedException {
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ final CountDownLatch sync = new CountDownLatch(1);
+
+ addRequest();
+ addRequest();
+
+ writer.registerAllRequestsProcessedListener(listener);
+
+ final Thread asyncCloseThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ writer.close();
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ finally {
+ sync.countDown();
+ }
+ }
+ });
+
+ asyncCloseThread.start();
+
+ handleRequest();
+ handleRequest();
+
+ sync.await();
+
+ assertEquals("Listener was not notified.", 1, listener.getNumberOfNotifications());
+ }
+
+ @Test
+ public void testConcurrentSubscribeAndHandleRequest() throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ final Callable<Boolean> subscriber = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return writer.registerAllRequestsProcessedListener(listener);
+ }
+ };
+
+ final Callable<Void> requestHandler = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ handleRequest();
+ return null;
+ }
+ };
+
+ try {
+ // Repeat this to provoke races
+ for (int i = 0; i < 50000; i++) {
+ listener.reset();
+
+ addRequest();
+
+ Future<Void> handleRequestFuture = executor.submit(requestHandler);
+ Future<Boolean> subscribeFuture = executor.submit(subscriber);
+
+ handleRequestFuture.get();
+
+ try {
+ if (subscribeFuture.get()) {
+ assertEquals("Race: Successfully subscribed, but was never notified.", 1, listener.getNumberOfNotifications());
+ }
+ else {
+ assertEquals("Race: Never subscribed successfully, but was notified.", 0, listener.getNumberOfNotifications());
+ }
+ }
+ catch (Throwable t) {
+ System.out.println(i);
+ Assert.fail(t.getMessage());
+ }
+ }
+ }
+ finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void addRequest() throws IOException {
+ writer.writeBlock(mockBuffer);
+ }
+
+ private void handleRequest() {
+ writer.handleProcessedBuffer(mockBuffer, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
new file mode 100644
index 0000000..49e93c6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+public class AsynchronousFileIOChannelTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);
+
+ @Test
+ public void testAllRequestsProcessedListenerNotification() throws Exception {
+ // -- Config ----------------------------------------------------------
+ final int numberOfRuns = 10;
+ final int numberOfRequests = 100;
+
+ // -- Setup -----------------------------------------------------------
+ final IOManagerAsync ioManager = new IOManagerAsync();
+
+ final ExecutorService executor = Executors.newFixedThreadPool(3);
+
+ final Random random = new Random();
+
+ final RequestQueue<WriteRequest> requestQueue = new RequestQueue<WriteRequest>();
+
+ final RequestDoneCallback<Buffer> ioChannelCallback = mock(RequestDoneCallback.class);
+
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ // -- The Test --------------------------------------------------------
+ try {
+ // Repeatedly add requests and process them and have one thread try to register as a
+ // listener until the channel is closed and all requests are processed.
+
+ for (int run = 0; run < numberOfRuns; run++) {
+ final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(
+ ioManager.createChannel(), requestQueue, ioChannelCallback, true);
+
+ final CountDownLatch sync = new CountDownLatch(3);
+
+ // The mock requests
+ final Buffer buffer = mock(Buffer.class);
+ final WriteRequest request = mock(WriteRequest.class);
+
+ // Add requests task
+ Callable<Void> addRequestsTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < numberOfRuns; i++) {
+ LOG.debug("Starting run {}.", i + 1);
+
+ for (int j = 0; j < numberOfRequests; j++) {
+ ioChannel.addRequest(request);
+ }
+
+ LOG.debug("Added all ({}) requests of run {}.", numberOfRequests, i + 1);
+
+ int sleep = random.nextInt(10);
+ LOG.debug("Sleeping for {} ms before next run.", sleep);
+
+ Thread.sleep(sleep);
+ }
+
+ LOG.debug("Done. Closing channel.");
+ ioChannel.close();
+
+ sync.countDown();
+
+ return null;
+ }
+ };
+
+ // Process requests task
+ Callable<Void> processRequestsTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ int total = numberOfRequests * numberOfRuns;
+ for (int i = 0; i < total; i++) {
+ requestQueue.take();
+
+ ioChannel.handleProcessedBuffer(buffer, null);
+ }
+
+ LOG.debug("Processed all ({}) requests.", numberOfRequests);
+
+ sync.countDown();
+
+ return null;
+ }
+ };
+
+ // Listener
+ Callable<Void> registerListenerTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ int current = listener.getNumberOfNotifications();
+
+ if (ioChannel.registerAllRequestsProcessedListener(listener)) {
+ listener.waitForNotification(current);
+ }
+ else if (ioChannel.isClosed()) {
+ break;
+ }
+ }
+
+ LOG.debug("Stopping listener. Channel closed.");
+
+ sync.countDown();
+
+ return null;
+ }
+ };
+
+ // Run tasks in random order
+ final List<Callable<?>> tasks = new LinkedList<Callable<?>>();
+ tasks.add(addRequestsTask);
+ tasks.add(processRequestsTask);
+ tasks.add(registerListenerTask);
+
+ Collections.shuffle(tasks);
+
+ for (Callable<?> task : tasks) {
+ executor.submit(task);
+ }
+
+ if (!sync.await(2, TimeUnit.MINUTES)) {
+ fail("Test failed due to a timeout. This indicates a deadlock due to the way" +
+ "that listeners are registered/notified in the asynchronous file I/O" +
+ "channel.");
+ }
+
+ listener.reset();
+ }
+ }
+ finally {
+ ioManager.shutdown();
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ public void testClosedButAddRequestAndRegisterListenerRace() throws Exception {
+ // -- Config ----------------------------------------------------------
+ final int numberOfRuns = 1024;
+
+ // -- Setup -----------------------------------------------------------
+ final IOManagerAsync ioManager = new IOManagerAsync();
+
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ final RequestQueue<WriteRequest> requestQueue = new RequestQueue<WriteRequest>();
+
+ @SuppressWarnings("unchecked")
+ final RequestDoneCallback<Buffer> ioChannelCallback = mock(RequestDoneCallback.class);
+
+ final TestNotificationListener listener = new TestNotificationListener();
+
+ // -- The Test --------------------------------------------------------
+ try {
+ // Repeatedly close the channel and add a request.
+ for (int i = 0; i < numberOfRuns; i++) {
+ final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(
+ ioManager.createChannel(), requestQueue, ioChannelCallback, true);
+
+ final CountDownLatch sync = new CountDownLatch(2);
+
+ final WriteRequest request = mock(WriteRequest.class);
+
+ ioChannel.close();
+
+ // Add request task
+ Callable<Void> addRequestTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ ioChannel.addRequest(request);
+ }
+ catch (Throwable expected) {
+ }
+ finally {
+ sync.countDown();
+ }
+
+ return null;
+ }
+ };
+
+ // Listener
+ Callable<Void> registerListenerTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ while (true) {
+ int current = listener.getNumberOfNotifications();
+
+ if (ioChannel.registerAllRequestsProcessedListener(listener)) {
+ listener.waitForNotification(current);
+ }
+ else if (ioChannel.isClosed()) {
+ break;
+ }
+ }
+ }
+ finally {
+ sync.countDown();
+ }
+
+ return null;
+ }
+ };
+
+ executor.submit(addRequestTask);
+ executor.submit(registerListenerTask);
+
+ if (!sync.await(2, TimeUnit.MINUTES)) {
+ fail("Test failed due to a timeout. This indicates a deadlock due to the way" +
+ "that listeners are registered/notified in the asynchronous file I/O" +
+ "channel.");
+ }
+ }
+ }
+ finally {
+ ioManager.shutdown();
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ public void testClosingWaits() {
+ IOManagerAsync ioMan = new IOManagerAsync();
+ try {
+
+ final int NUM_BLOCKS = 100;
+ final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+
+ final AtomicInteger callbackCounter = new AtomicInteger();
+ final AtomicBoolean exceptionOccurred = new AtomicBoolean();
+
+ final RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>() {
+
+ @Override
+ public void requestSuccessful(MemorySegment buffer) {
+ // we do the non safe variant. the callbacks should come in order from
+ // the same thread, so it should always work
+ callbackCounter.set(callbackCounter.get() + 1);
+
+ if (buffer != seg) {
+ exceptionOccurred.set(true);
+ }
+ }
+
+ @Override
+ public void requestFailed(MemorySegment buffer, IOException e) {
+ exceptionOccurred.set(true);
+ }
+ };
+
+ BlockChannelWriterWithCallback<MemorySegment> writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
+ try {
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ writer.writeBlock(seg);
+ }
+
+ writer.close();
+
+ assertEquals(NUM_BLOCKS, callbackCounter.get());
+ assertFalse(exceptionOccurred.get());
+ }
+ finally {
+ writer.closeAndDelete();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ ioMan.shutdown();
+ }
+ }
+
+ @Test
+ public void testExceptionForwardsToClose() {
+ IOManagerAsync ioMan = new IOManagerAsync();
+ try {
+ testExceptionForwardsToClose(ioMan, 100, 1);
+ testExceptionForwardsToClose(ioMan, 100, 50);
+ testExceptionForwardsToClose(ioMan, 100, 100);
+ } finally {
+ ioMan.shutdown();
+ }
+ }
+
+ private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
+ try {
+ MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+ FileIOChannel.ID channelId = ioMan.createChannel();
+
+ BlockChannelWriterWithCallback<MemorySegment> writer = new AsynchronousBlockWriterWithCallback(channelId,
+ ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) {
+
+ private int numBlocks;
+
+ @Override
+ public void writeBlock(MemorySegment segment) throws IOException {
+ numBlocks++;
+
+ if (numBlocks == failingBlock) {
+ this.requestsNotReturned.incrementAndGet();
+ this.requestQueue.add(new FailingWriteRequest(this, segment));
+ } else {
+ super.writeBlock(segment);
+ }
+ }
+ };
+
+ try {
+ for (int i = 0; i < numBlocks; i++) {
+ writer.writeBlock(seg);
+ }
+
+ writer.close();
+ fail("did not forward exception");
+ }
+ catch (IOException e) {
+ // expected
+ }
+ finally {
+ try {
+ writer.closeAndDelete();
+ } catch (Throwable t) {}
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static class NoOpCallback implements RequestDoneCallback<MemorySegment> {
+
+ @Override
+ public void requestSuccessful(MemorySegment buffer) {}
+
+ @Override
+ public void requestFailed(MemorySegment buffer, IOException e) {}
+ }
+
+ private static class FailingWriteRequest implements WriteRequest {
+
+ private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
+
+ private final MemorySegment segment;
+
+ protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
+ this.channel = targetChannel;
+ this.segment = segment;
+ }
+
+ @Override
+ public void write() throws IOException {
+ throw new IOException();
+ }
+
+ @Override
+ public void requestDone(IOException ioex) {
+ this.channel.handleProcessedBuffer(this.segment, ioex);
+ }
+ }
+
+ private static class TestAsyncFileIOChannel extends AsynchronousFileIOChannel<Buffer, WriteRequest> {
+
+ protected TestAsyncFileIOChannel(
+ ID channelID,
+ RequestQueue<WriteRequest> requestQueue,
+ RequestDoneCallback<Buffer> callback,
+ boolean writeEnabled) throws IOException {
+
+ super(channelID, requestQueue, callback, writeEnabled);
+ }
+
+ int getNumberOfOutstandingRequests() {
+ return requestsNotReturned.get();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
deleted file mode 100644
index 0ed6233..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-public class AsynchronousFileIOChannelsTest {
-
- @Test
- public void testClosingWaits() {
- IOManagerAsync ioMan = new IOManagerAsync();
- try {
-
- final int NUM_BLOCKS = 100;
- final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
-
- final AtomicInteger callbackCounter = new AtomicInteger();
- final AtomicBoolean exceptionOccurred = new AtomicBoolean();
-
- final RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>() {
-
- @Override
- public void requestSuccessful(MemorySegment buffer) {
- // we do the non safe variant. the callbacks should come in order from
- // the same thread, so it should always work
- callbackCounter.set(callbackCounter.get() + 1);
-
- if (buffer != seg) {
- exceptionOccurred.set(true);
- }
- }
-
- @Override
- public void requestFailed(MemorySegment buffer, IOException e) {
- exceptionOccurred.set(true);
- }
- };
-
- BlockChannelWriterWithCallback writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
- try {
- for (int i = 0; i < NUM_BLOCKS; i++) {
- writer.writeBlock(seg);
- }
-
- writer.close();
-
- assertEquals(NUM_BLOCKS, callbackCounter.get());
- assertFalse(exceptionOccurred.get());
- }
- finally {
- writer.closeAndDelete();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- ioMan.shutdown();
- }
- }
-
- @Test
- public void testExceptionForwardsToClose() {
- IOManagerAsync ioMan = new IOManagerAsync();
- try {
- testExceptionForwardsToClose(ioMan, 100, 1);
- testExceptionForwardsToClose(ioMan, 100, 50);
- testExceptionForwardsToClose(ioMan, 100, 100);
- } finally {
- ioMan.shutdown();
- }
- }
-
- private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
- try {
- MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
- FileIOChannel.ID channelId = ioMan.createChannel();
-
- BlockChannelWriterWithCallback writer = new AsynchronousBlockWriterWithCallback(channelId,
- ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) {
-
- private int numBlocks;
-
- @Override
- public void writeBlock(MemorySegment segment) throws IOException {
- numBlocks++;
-
- if (numBlocks == failingBlock) {
- this.requestsNotReturned.incrementAndGet();
- this.requestQueue.add(new FailingWriteRequest(this, segment));
- } else {
- super.writeBlock(segment);
- }
- }
- };
-
- try {
- for (int i = 0; i < numBlocks; i++) {
- writer.writeBlock(seg);
- }
-
- writer.close();
- fail("did not forward exception");
- }
- catch (IOException e) {
- // expected
- }
- finally {
- try {
- writer.closeAndDelete();
- } catch (Throwable t) {}
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static class NoOpCallback implements RequestDoneCallback<MemorySegment> {
-
- @Override
- public void requestSuccessful(MemorySegment buffer) {}
-
- @Override
- public void requestFailed(MemorySegment buffer, IOException e) {}
- }
-
- private static class FailingWriteRequest implements WriteRequest {
-
- private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
-
- private final MemorySegment segment;
-
- protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
- this.channel = targetChannel;
- this.segment = segment;
- }
-
- @Override
- public void write() throws IOException {
- throw new IOException();
- }
-
- @Override
- public void requestDone(IOException ioex) {
- this.channel.handleProcessedBuffer(this.segment, ioex);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
new file mode 100644
index 0000000..294a6e6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class BufferFileWriterFileSegmentReaderTest {
+
+ private static final int BUFFER_SIZE = 32 * 1024;
+
+ private static final BufferRecycler BUFFER_RECYCLER = new DiscardingRecycler();
+
+ private static final Random random = new Random();
+
+ private static final IOManager ioManager = new IOManagerAsync();
+
+ private BufferFileWriter writer;
+
+ private AsynchronousBufferFileSegmentReader reader;
+
+ private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<FileSegment>();
+
+ @Before
+ public void setUpWriterAndReader() {
+ final FileIOChannel.ID channel = ioManager.createChannel();
+
+ try {
+ writer = ioManager.createBufferFileWriter(channel);
+ reader = (AsynchronousBufferFileSegmentReader) ioManager.createBufferFileSegmentReader(channel, new QueuingCallback<FileSegment>(returnedFileSegments));
+ }
+ catch (IOException e) {
+ if (writer != null) {
+ writer.deleteChannel();
+ }
+
+ if (reader != null) {
+ reader.deleteChannel();
+ }
+
+ fail("Failed to setup writer and reader.");
+ }
+ }
+
+ @After
+ public void tearDownWriterAndReader() {
+ if (writer != null) {
+ writer.deleteChannel();
+ }
+
+ if (reader != null) {
+ reader.deleteChannel();
+ }
+
+ returnedFileSegments.clear();
+ }
+
+ @Test
+ public void testWriteRead() throws IOException, InterruptedException {
+ int numBuffers = 1024;
+ int currentNumber = 0;
+
+ final int minBufferSize = BUFFER_SIZE / 4;
+
+ // Write buffers filled with ascending numbers...
+ for (int i = 0; i < numBuffers; i++) {
+ final Buffer buffer = createBuffer();
+
+ int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
+
+ buffer.setSize(size);
+
+ currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+ writer.writeBlock(buffer);
+ }
+
+ // Make sure that the writes are finished
+ writer.close();
+
+ // Read buffers back in...
+ for (int i = 0; i < numBuffers; i++) {
+ assertFalse(reader.hasReachedEndOfFile());
+ reader.read();
+ }
+
+ // Wait for all requests to be finished
+ final CountDownLatch sync = new CountDownLatch(1);
+ final NotificationListener listener = new NotificationListener() {
+ @Override
+ public void onNotification() {
+ sync.countDown();
+ }
+ };
+
+ if (reader.registerAllRequestsProcessedListener(listener)) {
+ sync.await();
+ }
+
+ assertTrue(reader.hasReachedEndOfFile());
+
+ // Verify that the content is the same
+ assertEquals("Read less buffers than written.", numBuffers, returnedFileSegments.size());
+
+ currentNumber = 0;
+ FileSegment fileSegment;
+
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+
+ while ((fileSegment = returnedFileSegments.poll()) != null) {
+ buffer.position(0);
+ buffer.limit(fileSegment.getLength());
+
+ fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
+
+ currentNumber = verifyBufferFilledWithAscendingNumbers(new Buffer(new MemorySegment(buffer.array()), BUFFER_RECYCLER), currentNumber, fileSegment.getLength());
+ }
+
+ reader.close();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private int getRandomNumberInRange(int min, int max) {
+ return random.nextInt((max - min) + 1) + min;
+ }
+
+ private int getNextMultipleOf(int number, int multiple) {
+ final int mod = number % multiple;
+
+ if (mod == 0) {
+ return number;
+ }
+
+ return number + multiple - mod;
+ }
+
+ private Buffer createBuffer() {
+ return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+ }
+
+ public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+ MemorySegment segment = buffer.getMemorySegment();
+
+ final int size = buffer.getSize();
+
+ for (int i = 0; i < size; i += 4) {
+ segment.putInt(i, currentNumber++);
+ }
+
+ return currentNumber;
+ }
+
+ private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
+ MemorySegment segment = buffer.getMemorySegment();
+
+ for (int i = 0; i < size; i += 4) {
+ if (segment.getInt(i) != currentNumber++) {
+ throw new IllegalStateException("Read unexpected number from buffer.");
+ }
+ }
+
+ return currentNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
new file mode 100644
index 0000000..b0c702a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class BufferFileWriterReaderTest {
+
+ private static final int BUFFER_SIZE = 32 * 1024;
+
+ private static final BufferRecycler BUFFER_RECYCLER = new DiscardingRecycler();
+
+ private static final Random random = new Random();
+
+ private static final IOManager ioManager = new IOManagerAsync();
+
+ private BufferFileWriter writer;
+
+ private BufferFileReader reader;
+
+ private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<Buffer>();
+
+ @Before
+ public void setUpWriterAndReader() {
+ final FileIOChannel.ID channel = ioManager.createChannel();
+
+ try {
+ writer = ioManager.createBufferFileWriter(channel);
+ reader = ioManager.createBufferFileReader(channel, new QueuingCallback<Buffer>(returnedBuffers));
+ }
+ catch (IOException e) {
+ if (writer != null) {
+ writer.deleteChannel();
+ }
+
+ if (reader != null) {
+ reader.deleteChannel();
+ }
+
+ fail("Failed to setup writer and reader.");
+ }
+ }
+
+ @After
+ public void tearDownWriterAndReader() {
+ if (writer != null) {
+ writer.deleteChannel();
+ }
+
+ if (reader != null) {
+ reader.deleteChannel();
+ }
+
+ returnedBuffers.clear();
+ }
+
+ @Test
+ public void testWriteRead() throws IOException {
+ int numBuffers = 1024;
+ int currentNumber = 0;
+
+ final int minBufferSize = BUFFER_SIZE / 4;
+
+ // Write buffers filled with ascending numbers...
+ for (int i = 0; i < numBuffers; i++) {
+ final Buffer buffer = createBuffer();
+
+ int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
+
+ buffer.setSize(size);
+
+ currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+ writer.writeBlock(buffer);
+ }
+
+ // Make sure that the writes are finished
+ writer.close();
+
+ // Read buffers back in...
+ for (int i = 0; i < numBuffers; i++) {
+ assertFalse(reader.hasReachedEndOfFile());
+ reader.readInto(createBuffer());
+ }
+
+ reader.close();
+
+ assertTrue(reader.hasReachedEndOfFile());
+
+ // Verify that the content is the same
+ assertEquals("Read less buffers than written.", numBuffers, returnedBuffers.size());
+
+ currentNumber = 0;
+ Buffer buffer;
+
+ while ((buffer = returnedBuffers.poll()) != null) {
+ currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+ }
+ }
+
+ @Test
+ public void testWriteSkipRead() throws IOException {
+ int numBuffers = 1024;
+ int currentNumber = 0;
+
+ final int minBufferSize = BUFFER_SIZE / 4;
+
+ // Write buffers filled with ascending numbers...
+ for (int i = 0; i < numBuffers; i++) {
+ final Buffer buffer = createBuffer();
+
+ currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+ writer.writeBlock(buffer);
+ }
+
+ // Make sure that the writes are finished
+ writer.close();
+
+ final int toSkip = 32;
+
+ // Skip first buffers...
+ reader.seekToPosition((8 + BUFFER_SIZE) * toSkip);
+
+ numBuffers -= toSkip;
+
+ // Read buffers back in...
+ for (int i = 0; i < numBuffers; i++) {
+ assertFalse(reader.hasReachedEndOfFile());
+ reader.readInto(createBuffer());
+ }
+
+ reader.close();
+
+ assertTrue(reader.hasReachedEndOfFile());
+
+ // Verify that the content is the same
+ assertEquals("Read less buffers than written.", numBuffers, returnedBuffers.size());
+
+ // Start number after skipped buffers...
+ currentNumber = (BUFFER_SIZE / 4) * toSkip;
+
+ Buffer buffer;
+ while ((buffer = returnedBuffers.poll()) != null) {
+ currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private int getRandomNumberInRange(int min, int max) {
+ return random.nextInt((max - min) + 1) + min;
+ }
+
+ private int getNextMultipleOf(int number, int multiple) {
+ final int mod = number % multiple;
+
+ if (mod == 0) {
+ return number;
+ }
+
+ return number + multiple - mod;
+ }
+
+ private Buffer createBuffer() {
+ return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+ }
+
+ public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+ MemorySegment segment = buffer.getMemorySegment();
+
+ final int size = buffer.getSize();
+
+ for (int i = 0; i < size; i += 4) {
+ segment.putInt(i, currentNumber++);
+ }
+
+ return currentNumber;
+ }
+
+ private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
+ MemorySegment segment = buffer.getMemorySegment();
+
+ final int size = buffer.getSize();
+
+ for (int i = 0; i < size; i += 4) {
+ if (segment.getInt(i) != currentNumber++) {
+ throw new IllegalStateException("Read unexpected number from buffer.");
+ }
+ }
+
+ return currentNumber;
+ }
+}