You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/11 11:48:48 UTC
[1/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager
Readers and Writers to interfaces,
add implementation that uses callbacks on completed write requests.
Repository: incubator-flink
Updated Branches:
refs/heads/master 8e4c772ad -> c9cfe3ba9
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index d5c5760..78951d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -64,7 +64,7 @@ public class IOManagerITCase {
@Before
public void beforeTest() {
memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1);
- ioManager = new IOManager();
+ ioManager = new IOManagerAsync();
}
@After
@@ -91,7 +91,7 @@ public class IOManagerITCase {
final Random rnd = new Random(SEED);
final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
- Channel.ID[] ids = new Channel.ID[NUM_CHANNELS];
+ FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
BlockChannelWriter[] writers = new BlockChannelWriter[NUM_CHANNELS];
BlockChannelReader[] readers = new BlockChannelReader[NUM_CHANNELS];
ChannelWriterOutputView[] outs = new ChannelWriterOutputView[NUM_CHANNELS];
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index ed3a28a..9c129e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -41,7 +41,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -57,8 +57,8 @@ import org.junit.Test;
*
*
*/
-public class IOManagerPerformanceBenchmark
-{
+public class IOManagerPerformanceBenchmark {
+
private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 524288 };
@@ -80,10 +80,9 @@ public class IOManagerPerformanceBenchmark
@Before
- public void startup()
- {
+ public void startup() {
memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
- ioManager = new IOManager();
+ ioManager = new IOManagerAsync();
}
@After
@@ -111,7 +110,7 @@ public class IOManagerPerformanceBenchmark
private void testChannelWithSegments(int numSegments) throws Exception
{
final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
BlockChannelWriter writer = null;
BlockChannelReader reader = null;
@@ -246,7 +245,7 @@ public class IOManagerPerformanceBenchmark
}
private void speedTestStream(int bufferSize) throws IOException {
- final Channel.ID tmpChannel = ioManager.createChannel();
+ final FileIOChannel.ID tmpChannel = ioManager.createChannel();
final IntegerRecord rec = new IntegerRecord(0);
File tempFile = null;
@@ -342,7 +341,7 @@ public class IOManagerPerformanceBenchmark
@SuppressWarnings("resource")
private void speedTestNIO(int bufferSize, boolean direct) throws IOException
{
- final Channel.ID tmpChannel = ioManager.createChannel();
+ final FileIOChannel.ID tmpChannel = ioManager.createChannel();
File tempFile = null;
FileChannel fs = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 810b0e2..fc6ea37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -26,10 +26,10 @@ import java.util.List;
import org.junit.Assert;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
@@ -39,8 +39,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class IOManagerTest
-{
+public class IOManagerTest {
+
// ------------------------------------------------------------------------
// Cross Test Fields
// ------------------------------------------------------------------------
@@ -54,15 +54,13 @@ public class IOManagerTest
// ------------------------------------------------------------------------
@Before
- public void beforeTest()
- {
+ public void beforeTest() {
this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
- public void afterTest()
- {
+ public void afterTest() {
this.ioManager.shutdown();
Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
@@ -84,10 +82,10 @@ public class IOManagerTest
public void channelEnumerator() {
File tempPath = new File(System.getProperty("java.io.tmpdir"));
- Channel.Enumerator enumerator = ioManager.createChannelEnumerator();
+ FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator();
for (int i = 0; i < 10; i++) {
- Channel.ID id = enumerator.next();
+ FileIOChannel.ID id = enumerator.next();
File path = new File(id.getPath());
Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
@@ -99,12 +97,11 @@ public class IOManagerTest
// ------------------------------------------------------------------------
@Test
- public void channelReadWriteOneSegment()
- {
+ public void channelReadWriteOneSegment() {
final int NUM_IOS = 1111;
try {
- final Channel.ID channelID = this.ioManager.createChannel();
+ final FileIOChannel.ID channelID = this.ioManager.createChannel();
final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
MemorySegment memSeg = this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0);
@@ -143,14 +140,13 @@ public class IOManagerTest
}
@Test
- public void channelReadWriteMultipleSegments()
- {
+ public void channelReadWriteMultipleSegments() {
final int NUM_IOS = 1111;
final int NUM_SEGS = 16;
try {
final List<MemorySegment> memSegs = this.memoryManager.allocatePages(new DummyInvokable(), NUM_SEGS);
- final Channel.ID channelID = this.ioManager.createChannel();
+ final FileIOChannel.ID channelID = this.ioManager.createChannel();
final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
for (int i = 0; i < NUM_IOS; i++) {
@@ -202,29 +198,26 @@ public class IOManagerTest
// ============================================================================================
- final class FailingSegmentReadRequest implements ReadRequest
- {
- private final BlockChannelAccess<ReadRequest, ?> channel;
+ final class FailingSegmentReadRequest implements ReadRequest {
+
+ private final AsynchronousFileIOChannel<ReadRequest> channel;
private final MemorySegment segment;
- protected FailingSegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
- {
+ protected FailingSegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
this.channel = targetChannel;
this.segment = segment;
}
@Override
- public void read() throws IOException
- {
+ public void read() throws IOException {
throw new TestIOException();
}
@Override
- public void requestDone(IOException ioex)
- {
+ public void requestDone(IOException ioex) {
this.channel.handleProcessedBuffer(this.segment, ioex);
}
}
@@ -234,36 +227,30 @@ public class IOManagerTest
/**
* Special write request that writes an entire memory segment to the block writer.
*/
- final class FailingSegmentWriteRequest implements WriteRequest
- {
- private final BlockChannelAccess<WriteRequest, ?> channel;
+ final class FailingSegmentWriteRequest implements WriteRequest {
+
+ private final AsynchronousFileIOChannel<WriteRequest> channel;
private final MemorySegment segment;
- protected FailingSegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
- {
+ protected FailingSegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
this.channel = targetChannel;
this.segment = segment;
}
-
@Override
- public void write() throws IOException
- {
+ public void write() throws IOException {
throw new TestIOException();
}
-
@Override
- public void requestDone(IOException ioex)
- {
+ public void requestDone(IOException ioex) {
this.channel.handleProcessedBuffer(this.segment, ioex);
}
}
- final class TestIOException extends IOException
- {
+ final class TestIOException extends IOException {
private static final long serialVersionUID = -814705441998024472L;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 2f30eed..bca6896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -16,13 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -32,8 +30,9 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
import org.junit.Test;
-public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
+
private static final long CROSS_MEM = 1024 * 1024;
private final double cross_frac;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 6e66e72..1e0e882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -32,8 +32,9 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
private static final long HASH_MEM = 4*1024*1024;
private static final long SORT_MEM = 3*1024*1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index a824f77..584bc02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -40,8 +40,9 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
private static final long HASH_MEM = 6*1024*1024;
private static final long SORT_MEM = 3*1024*1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
index 6dfe82b..21e686d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -61,7 +62,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
public class HashMatchIteratorITCase {
private static final int MEMORY_SIZE = 16000000; // total memory
@@ -104,12 +105,11 @@ public class HashMatchIteratorITCase {
this.recordPairPairComparator = new RecordIntPairPairComparator();
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
- public void afterTest()
- {
+ public void afterTest() {
if (this.ioManager != null) {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 921151a..2cc9892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -96,7 +97,7 @@ public class HashTableITCase {
this.pairComparator = new IntPairPairComparator();
this.memManager = new DefaultMemoryManager(32 * 1024 * 1024,1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
@@ -756,7 +757,7 @@ public class HashTableITCase {
}
// create the I/O access for spilling
- final IOManager ioManager = new IOManager();
+ final IOManager ioManager = new IOManagerAsync();
// ----------------------------------------------------------------------------------------
@@ -857,7 +858,7 @@ public class HashTableITCase {
}
// create the I/O access for spilling
- IOManager ioManager = new IOManager();
+ IOManager ioManager = new IOManagerAsync();
// create the map for validating the results
HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
@@ -972,7 +973,7 @@ public class HashTableITCase {
}
// create the I/O access for spilling
- IOManager ioManager = new IOManager();
+ IOManager ioManager = new IOManagerAsync();
// create the map for validating the results
HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 0fbe98a..a8941a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,7 +56,7 @@ public class HashTablePerformanceComparison {
private final TypePairComparator<IntPair, IntPair> pairComparator = new IntPairPairComparator();
- private IOManager ioManager = new IOManager();
+ private IOManager ioManager = new IOManagerAsync();
@Test
public void testCompactingHashMapPerformance() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index c1a906c..04d1a38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -68,7 +69,7 @@ import org.junit.Test;
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
*/
-
+@SuppressWarnings("deprecation")
public class ReOpenableHashTableITCase {
private static final int PAGE_SIZE = 8 * 1024;
@@ -120,7 +121,7 @@ public class ReOpenableHashTableITCase {
this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 2bc11f1..4db520e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -56,7 +57,7 @@ public class SpillingResettableIteratorTest {
public void startup() {
// set up IO and memory manager
this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
- this.ioman = new IOManager();
+ this.ioman = new IOManagerAsync();
// create test objects
ArrayList<IntValue> objects = new ArrayList<IntValue>(NUM_TESTRECORDS);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 9eaae9a..10d3534 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -57,7 +58,7 @@ public class SpillingResettableMutableObjectIteratorTest {
public void startup() {
// set up IO and memory manager
this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
- this.ioman = new IOManager();
+ this.ioman = new IOManagerAsync();
// create test objects
final ArrayList<Record> objects = new ArrayList<Record>(NUM_TESTRECORDS);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index cbceb8b..852a8f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -80,7 +81,7 @@ public class CombiningUnilateralSortMergerITCase {
@Before
public void beforeTest() {
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
this.serializerFactory = RecordSerializerFactory.get();
this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 8e4bbc0..a340ef2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -81,7 +82,7 @@ public class ExternalSortITCase {
@Before
public void beforeTest() {
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
this.pactRecordSerializer = RecordSerializerFactory.get();
this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 257eb87..9dec847 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -85,7 +86,7 @@ public class MassiveStringSortingITCase {
try {
MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
- IOManager ioMan = new IOManager();
+ IOManager ioMan = new IOManagerAsync();
TypeSerializer<String> serializer = StringSerializer.INSTANCE;
TypeComparator<String> comparator = new StringComparator(true);
@@ -175,7 +176,7 @@ public class MassiveStringSortingITCase {
try {
MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
- IOManager ioMan = new IOManager();
+ IOManager ioMan = new IOManagerAsync();
TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<String, String[]>");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index 766d66c..e12c4ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -53,7 +54,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
+@SuppressWarnings("deprecation")
public class SortMergeMatchIteratorITCase {
// total memory
@@ -85,8 +86,7 @@ public class SortMergeMatchIteratorITCase {
@SuppressWarnings("unchecked")
@Before
- public void beforeTest()
- {
+ public void beforeTest() {
this.serializer1 = RecordSerializer.get();
this.serializer2 = RecordSerializer.get();
this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
@@ -94,12 +94,11 @@ public class SortMergeMatchIteratorITCase {
this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
- public void afterTest()
- {
+ public void afterTest() {
if (this.ioManager != null) {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 4d04bf4..02206f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -92,7 +93,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.perSortMem = perSortMemory;
this.perSortFractionMem = (double)perSortMemory/totalMem;
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
this.inputs = new ArrayList<MutableObjectIterator<Record>>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 5e0edc4..60a81c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
@@ -86,7 +87,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
this.outputs = new LinkedList<OutputGate>();
this.memManager = new DefaultMemoryManager(memorySize, 1);
- this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
+ this.ioManager = new IOManagerAsync();
this.inputSplitProvider = inputSplitProvider;
this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 744651d..ba38776 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -47,7 +48,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
+@SuppressWarnings("deprecation")
public class HashVsSortMiniBenchmark {
// total memory
@@ -94,7 +95,7 @@ public class HashVsSortMiniBenchmark {
this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
[2/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager
Readers and Writers to interfaces,
add implementation that uses callbacks on completed write requests.
Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 388773d..f77d9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
@@ -32,194 +30,71 @@ import org.apache.flink.core.memory.MemorySegment;
/**
* The facade for the provided I/O manager services.
*/
-public class IOManager implements UncaughtExceptionHandler {
+public abstract class IOManager {
/** Logging */
- private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
- /**
- * The default temp paths for anonymous Channels.
- */
+ /** The temporary directories for files */
private final String[] paths;
- /**
- * A random number generator for the anonymous ChannelIDs.
- */
+ /** A random number generator for the anonymous ChannelIDs. */
private final Random random;
-
- /**
- * The writer thread used for asynchronous block oriented channel writing.
- */
- private final WriterThread[] writers;
-
- /**
- * The reader threads used for asynchronous block oriented channel reading.
- */
- private final ReaderThread[] readers;
- /**
- * The number of the next path to use.
- */
+ /** The number of the next path to use. */
private volatile int nextPath;
-
- /**
- * A boolean flag indicating whether the close() has already been invoked.
- */
- private volatile boolean isClosed = false;
-
// -------------------------------------------------------------------------
// Constructors / Destructors
// -------------------------------------------------------------------------
/**
- * Constructs a new IOManager, writing channels to the system directory.
- */
- public IOManager() {
- this(System.getProperty("java.io.tmpdir"));
- }
-
- /**
- * Constructs a new IOManager.
- *
- * @param tempDir The base directory path for files underlying channels.
- */
- public IOManager(String tempDir) {
- this(new String[] {tempDir});
- }
-
- /**
* Constructs a new IOManager.
*
* @param paths
* the basic directory paths for files underlying anonymous channels.
*/
- public IOManager(String[] paths) {
+ protected IOManager(String[] paths) {
this.paths = paths;
this.random = new Random();
this.nextPath = 0;
-
- // start a write worker thread for each directory
- this.writers = new WriterThread[paths.length];
- for (int i = 0; i < this.writers.length; i++) {
- final WriterThread t = new WriterThread();
- this.writers[i] = t;
- t.setName("IOManager writer thread #" + (i + 1));
- t.setDaemon(true);
- t.setUncaughtExceptionHandler(this);
- t.start();
- }
-
- // start a reader worker thread for each directory
- this.readers = new ReaderThread[paths.length];
- for (int i = 0; i < this.readers.length; i++) {
- final ReaderThread t = new ReaderThread();
- this.readers[i] = t;
- t.setName("IOManager reader thread #" + (i + 1));
- t.setDaemon(true);
- t.setUncaughtExceptionHandler(this);
- t.start();
- }
}
/**
- * Close method. Shuts down the reader and writer threads immediately, not waiting for their
- * pending requests to be served. This method waits until the threads have actually ceased their
- * operation.
+ * Close method, marks the I/O manager as closed.
*/
- public synchronized final void shutdown()
- {
- if (!this.isClosed) {
- this.isClosed = true;
-
- // close writing and reading threads with best effort and log problems
-
- // --------------------------------- writer shutdown ----------------------------------
- for (int i = 0; i < this.readers.length; i++) {
- try {
- this.writers[i].shutdown();
- }
- catch (Throwable t) {
- LOG.error("Error while shutting down IO Manager writer thread.", t);
- }
- }
-
- // --------------------------------- reader shutdown ----------------------------------
- for (int i = 0; i < this.readers.length; i++) {
- try {
- this.readers[i].shutdown();
- }
- catch (Throwable t) {
- LOG.error("Error while shutting down IO Manager reader thread.", t);
- }
- }
-
- // ------------------------ wait until shutdown is complete ---------------------------
- try {
- for (int i = 0; i < this.readers.length; i++) {
- this.writers[i].join();
- }
- for (int i = 0; i < this.readers.length; i++) {
- this.readers[i].join();
- }
- }
- catch (InterruptedException iex) {}
- }
- }
+ public abstract void shutdown();
/**
- * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
- * to be properly shut down when it is closed and its threads have ceased operation.
+ * Utility method to check whether the IO manager has been properly shut down.
*
* @return True, if the IO manager has properly shut down, false otherwise.
*/
- public final boolean isProperlyShutDown()
- {
- boolean readersShutDown = true;
- for (int i = 0; i < this.readers.length; i++) {
- readersShutDown &= this.readers[i].getState() == Thread.State.TERMINATED;
- }
-
- boolean writersShutDown = true;
- for (int i = 0; i < this.writers.length; i++) {
- readersShutDown &= this.writers[i].getState() == Thread.State.TERMINATED;
- }
-
- return this.isClosed && writersShutDown && readersShutDown;
- }
-
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
- shutdown();
- }
+ public abstract boolean isProperlyShutDown();
// ------------------------------------------------------------------------
// Channel Instantiations
// ------------------------------------------------------------------------
/**
- * Creates a new {@link Channel.ID} in one of the temp directories. Multiple
+ * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
* invocations of this method spread the channels evenly across the different directories.
*
* @return A channel to a temporary directory.
*/
- public Channel.ID createChannel()
- {
+ public FileIOChannel.ID createChannel() {
final int num = getNextPathNum();
- return new Channel.ID(this.paths[num], num, this.random);
+ return new FileIOChannel.ID(this.paths[num], num, this.random);
}
/**
- * Creates a new {@link Channel.Enumerator}, spreading the channels in a round-robin fashion
+ * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
* across the temporary file directories.
*
* @return An enumerator for channels.
*/
- public Channel.Enumerator createChannelEnumerator()
- {
- return new Channel.Enumerator(this.paths, this.random);
+ public FileIOChannel.Enumerator createChannelEnumerator() {
+ return new FileIOChannel.Enumerator(this.paths, this.random);
}
@@ -228,199 +103,65 @@ public class IOManager implements UncaughtExceptionHandler {
// ------------------------------------------------------------------------
/**
- * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
- * accepting write request, carrying them out at some time and returning the written segment to the given queue
- * afterwards.
+ * Creates a block channel writer that writes to the given channel. The writer adds the
+ * written segment to its return-queue afterwards (to allow for asynchronous implementations).
*
* @param channelID The descriptor for the channel to write to.
- * @param returnQueue The queue to put the written buffers into.
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+ public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
+ return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
}
/**
- * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
- * accepting write request, carrying them out at some time and returning the written segment to the given queue
- * afterwards.
- * <p>
- * The writer will collect a specified number of write requests and carry them out
- * in one, effectively writing one block in the size of multiple memory pages.
- * Note that this means that no memory segment will reach the return queue before
- * the given number of requests are collected, so the number of buffers used with
- * the writer should be greater than the number of requests to combine. Ideally,
- * the number of memory segments used is a multiple of the number of requests to
- * combine.
+ * Creates a block channel writer that writes to the given channel. The writer adds the
+ * written segment to the given queue (to allow for asynchronous implementations).
*
* @param channelID The descriptor for the channel to write to.
* @param returnQueue The queue to put the written buffers into.
- * @param numRequestsToCombine The number of write requests to combine to one I/O request.
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
- }
+ public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
/**
- * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
- * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
+ * Creates a block channel writer that writes to the given channel. The writer calls the given callback
+ * after the I/O operation has been performed (successfully or unsuccessfully), to allow
+ * for asynchronous implementations.
*
* @param channelID The descriptor for the channel to write to.
+ * @param callback The callback to be called for
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
- }
+ public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException;
/**
- * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
- * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
- * <p>
- * The writer will collect a specified number of write requests and carry them out
- * in one, effectively writing one block in the size of multiple memory pages.
- * Note that this means that no memory segment will reach the return queue before
- * the given number of requests are collected, so the number of buffers used with
- * the writer should be greater than the number of requests to combine. Ideally,
- * the number of memory segments used is a multiple of the number of requests to
- * combine.
+ * Creates a block channel reader that reads blocks from the given channel. The reader pushed
+ * full memory segments (with the read data) to its "return queue", to allow for asynchronous read
+ * implementations.
*
* @param channelID The descriptor for the channel to write to.
- * @param numRequestsToCombine The number of write requests to combine to one I/O request.
- * @return A block channel writer that writes to the given channel.
- * @throws IOException Thrown, if the channel for the writer could not be opened.
- */
- public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, int numRequestsToCombine)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
- }
-
- /**
- * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
- * such that a read request is accepted, carried out at some (close) point in time, and the full segment
- * is pushed to the given queue.
- *
- * @param channelID The descriptor for the channel to write to.
- * @param returnQueue The queue to put the full buffers into.
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+ public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
+ return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
}
/**
- * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
- * such that a read request is accepted, carried out at some (close) point in time, and the full segment
- * is pushed to the given queue.
- * <p>
- * The reader will collect a specified number of read requests and carry them out
- * in one, effectively reading one block in the size of multiple memory pages.
- * Note that this means that no memory segment will reach the return queue before
- * the given number of requests are collected, so the number of buffers used with
- * the reader should be greater than the number of requests to combine. Ideally,
- * the number of memory segments used is a multiple of the number of requests to
- * combine.
+ * Creates a block channel reader that reads blocks from the given channel. The reader pushes the full segments
+ * to the given queue, to allow for asynchronous implementations.
*
* @param channelID The descriptor for the channel to write to.
* @param returnQueue The queue to put the full buffers into.
- * @param numRequestsToCombine The number of read requests to combine to one I/O request.
- * @return A block channel reader that reads from the given channel.
- * @throws IOException Thrown, if the channel for the reader could not be opened.
- */
- public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
- }
-
- /**
- * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
- * such that a read request is accepted, carried out at some (close) point in time, and the full segment
- * is pushed to the reader's return queue.
- *
- * @param channelID The descriptor for the channel to write to.
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public BlockChannelReader createBlockChannelReader(Channel.ID channelID)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
- }
-
- /**
- * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
- * such that a read request is accepted, carried out at some (close) point in time, and the full segment
- * is pushed to the reader's return queue.
- * <p>
- * The reader will collect a specified number of read requests and carry them out
- * in one, effectively reading one block in the size of multiple memory pages.
- * Note that this means that no memory segment will reach the return queue before
- * the given number of requests are collected, so the number of buffers used with
- * the reader should be greater than the number of requests to combine. Ideally,
- * the number of memory segments used is a multiple of the number of requests to
- * combine.
- *
- * @param channelID The descriptor for the channel to write to.
- * @param numRequestsToCombine The number of write requests to combine to one I/O request.
- * @return A block channel reader that reads from the given channel.
- * @throws IOException Thrown, if the channel for the reader could not be opened.
- */
- public BlockChannelReader createBlockChannelReader(Channel.ID channelID, int numRequestsToCombine)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue,
- new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
- }
+ public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
/**
* Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
@@ -437,217 +178,17 @@ public class IOManager implements UncaughtExceptionHandler {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public BulkBlockChannelReader createBulkBlockChannelReader(Channel.ID channelID,
- List<MemorySegment> targetSegments, int numBlocks)
- throws IOException
- {
- if (this.isClosed) {
- throw new IllegalStateException("I/O-Manger is closed.");
- }
-
- return new BulkBlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
- }
+ public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
+ List<MemorySegment> targetSegments, int numBlocks) throws IOException;
- // ========================================================================
- // Utilities
- // ========================================================================
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
- private final int getNextPathNum()
- {
+ protected int getNextPathNum() {
final int next = this.nextPath;
final int newNext = next + 1;
this.nextPath = newNext >= this.paths.length ? 0 : newNext;
return next;
}
-
-
- // ========================================================================
- // I/O Worker Threads
- // ========================================================================
-
- /**
- * A worker thread for asynchronous read.
- *
- */
- private static final class ReaderThread extends Thread
- {
- protected final RequestQueue<ReadRequest> requestQueue;
-
- private volatile boolean alive;
-
- // ---------------------------------------------------------------------
- // Constructors / Destructors
- // ---------------------------------------------------------------------
-
- protected ReaderThread()
- {
- this.requestQueue = new RequestQueue<ReadRequest>();
- this.alive = true;
- }
-
- /**
- * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
- * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
- * reported to them, declaring their request queue as closed.
- */
- protected void shutdown()
- {
- if (this.alive) {
- // shut down the thread
- try {
- this.alive = false;
- this.requestQueue.close();
- this.interrupt();
- }
- catch (Throwable t) {}
- }
-
- // notify all pending write requests that the thread has been shut down
- IOException ioex = new IOException("IO-Manager has been closed.");
-
- while (!this.requestQueue.isEmpty()) {
- ReadRequest request = this.requestQueue.poll();
- request.requestDone(ioex);
- }
- }
-
- // ---------------------------------------------------------------------
- // Main loop
- // ---------------------------------------------------------------------
-
- @Override
- public void run()
- {
- while (this.alive)
- {
-
- // get the next buffer. ignore interrupts that are not due to a shutdown.
- ReadRequest request = null;
- while (request == null) {
- try {
- request = this.requestQueue.take();
- }
- catch (InterruptedException iex) {
- if (!this.alive) {
- // exit
- return;
- }
- }
- }
-
- // remember any IO exception that occurs, so it can be reported to the writer
- IOException ioex = null;
-
- try {
- // read buffer from the specified channel
- request.read();
- }
- catch (IOException e) {
- ioex = e;
- }
- catch (Throwable t) {
- ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
- IOManager.LOG.error("I/O reading thread encountered an error" +
- t.getMessage() == null ? "." : ": ", t);
- }
-
- // invoke the processed buffer handler of the request issuing reader object
- request.requestDone(ioex);
- } // end while alive
- }
-
- } // end reading thread
-
- /**
- * A worker thread that asynchronously writes the buffers to disk.
- */
- private static final class WriterThread extends Thread
- {
- protected final RequestQueue<WriteRequest> requestQueue;
-
- private volatile boolean alive;
-
- // ---------------------------------------------------------------------
- // Constructors / Destructors
- // ---------------------------------------------------------------------
-
- protected WriterThread()
- {
- this.requestQueue = new RequestQueue<WriteRequest>();
- this.alive = true;
- }
-
- /**
- * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
- * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
- * reported to them, declaring their request queue as closed.
- */
- protected void shutdown()
- {
- if (this.alive) {
- // shut down the thread
- try {
- this.alive = false;
- this.requestQueue.close();
- this.interrupt();
- }
- catch (Throwable t) {}
-
- // notify all pending write requests that the thread has been shut down
- IOException ioex = new IOException("Writer thread has been closed.");
-
- while (!this.requestQueue.isEmpty())
- {
- WriteRequest request = this.requestQueue.poll();
- request.requestDone(ioex);
- }
- }
- }
-
- // ---------------------------------------------------------------------
- // Main loop
- // ---------------------------------------------------------------------
-
- @Override
- public void run()
- {
- while (this.alive) {
-
- WriteRequest request = null;
-
- // get the next buffer. ignore interrupts that are not due to a shutdown.
- while (request == null) {
- try {
- request = requestQueue.take();
- }
- catch (InterruptedException iex) {
- if (!this.alive) {
- // exit
- return;
- }
- }
- }
-
- // remember any IO exception that occurs, so it can be reported to the writer
- IOException ioex = null;
-
- try {
- // write buffer to the specified channel
- request.write();
- }
- catch (IOException e) {
- ioex = e;
- }
- catch (Throwable t) {
- ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
- IOManager.LOG.error("I/O reading thread encountered an error" +
- t.getMessage() == null ? "." : ": ", t);
- }
-
- // invoke the processed buffer handler of the request issuing writer object
- request.requestDone(ioex);
- } // end while alive
- }
-
- }; // end writer thread
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
new file mode 100644
index 0000000..1f79067
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A version of the {@link IOManager} that uses asynchronous I/O.
+ */
+public class IOManagerAsync extends IOManager implements UncaughtExceptionHandler {
+
+ /** The writer threads used for asynchronous block oriented channel writing. */
+ private final WriterThread[] writers;
+
+ /** The reader threads used for asynchronous block oriented channel reading. */
+ private final ReaderThread[] readers;
+
+ /** Lock object to guard shutdown */
+ private final Object shutdownLock = new Object();
+
+ /** Flag to mark the I/O manager as alive or shut down */
+ private volatile boolean shutdown;
+
+ // -------------------------------------------------------------------------
+ // Constructors / Destructors
+ // -------------------------------------------------------------------------
+
+ /**
+ * Constructs a new asynchronous I/O manger, writing files to the system 's temp directory.
+ */
+ public IOManagerAsync() {
+ this(EnvironmentInformation.getTemporaryFileDirectory());
+ }
+
+ /**
+ * Constructs a new asynchronous I/O manger, writing file to the given directory.
+ *
+ * @param tempDir The directory to write temporary files to.
+ */
+ public IOManagerAsync(String tempDir) {
+ this(new String[] {tempDir});
+ }
+
+ /**
+ * Constructs a new asynchronous I/O manger, writing file round robin across the given directories.
+ *
+ * @param tempDirs The directories to write temporary files to.
+ */
+ public IOManagerAsync(String[] tempDirs) {
+ super(tempDirs);
+
+ // start a write worker thread for each directory
+ this.writers = new WriterThread[tempDirs.length];
+ for (int i = 0; i < this.writers.length; i++) {
+ final WriterThread t = new WriterThread();
+ this.writers[i] = t;
+ t.setName("IOManager writer thread #" + (i + 1));
+ t.setDaemon(true);
+ t.setUncaughtExceptionHandler(this);
+ t.start();
+ }
+
+ // start a reader worker thread for each directory
+ this.readers = new ReaderThread[tempDirs.length];
+ for (int i = 0; i < this.readers.length; i++) {
+ final ReaderThread t = new ReaderThread();
+ this.readers[i] = t;
+ t.setName("IOManager reader thread #" + (i + 1));
+ t.setDaemon(true);
+ t.setUncaughtExceptionHandler(this);
+ t.start();
+ }
+ }
+
+ /**
+ * Close method. Shuts down the reader and writer threads immediately, not waiting for their
+ * pending requests to be served. This method waits until the threads have actually ceased their
+ * operation.
+ */
+ @Override
+ public void shutdown() {
+ synchronized (shutdownLock) {
+ if (shutdown) {
+ return;
+ }
+
+ shutdown = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutting down I/O manager.");
+ }
+
+ // close writing and reading threads with best effort and log problems
+ // first notify all to close, then wait until all are closed
+
+ for (WriterThread wt : writers) {
+ try {
+ wt.shutdown();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while shutting down IO Manager writer thread.", t);
+ }
+ }
+ for (ReaderThread rt : readers) {
+ try {
+ rt.shutdown();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while shutting down IO Manager reader thread.", t);
+ }
+ }
+ try {
+ for (WriterThread wt : writers) {
+ wt.join();
+ }
+ for (ReaderThread rt : readers) {
+ rt.join();
+ }
+ }
+ catch (InterruptedException iex) {}
+ }
+ }
+
+ /**
+ * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
+ * to be properly shut down when it is closed and its threads have ceased operation.
+ *
+ * @return True, if the IO manager has properly shut down, false otherwise.
+ */
+ @Override
+ public boolean isProperlyShutDown() {
+ boolean readersShutDown = true;
+ for (ReaderThread rt : readers) {
+ readersShutDown &= rt.getState() == Thread.State.TERMINATED;
+ }
+
+ boolean writersShutDown = true;
+ for (WriterThread wt : writers) {
+ readersShutDown &= wt.getState() == Thread.State.TERMINATED;
+ }
+
+ return shutdown && writersShutDown && readersShutDown;
+ }
+
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+ shutdown();
+ }
+
+ // ------------------------------------------------------------------------
+ // Reader / Writer instantiations
+ // ------------------------------------------------------------------------
+
+ @Override
+ public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+ {
+ Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
+ }
+
+ @Override
+ public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException {
+ Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
+ }
+
+ /**
+ * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
+ * such that a read request is accepted, carried out at some (close) point in time, and the full segment
+ * is pushed to the given queue.
+ *
+ * @param channelID The descriptor for the channel to write to.
+ * @param returnQueue The queue to put the full buffers into.
+ * @return A block channel reader that reads from the given channel.
+ * @throws IOException Thrown, if the channel for the reader could not be opened.
+ */
+ @Override
+ public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+ {
+ Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
+ }
+
+ /**
+ * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
+ * The reader draws segments to read the blocks into from a supplied list, which must contain as many
+ * segments as the channel has blocks. After the reader is done, the list with the full segments can be
+ * obtained from the reader.
+ * <p>
+ * If a channel is not to be read in one bulk, but in multiple smaller batches, a
+ * {@link BlockChannelReader} should be used.
+ *
+ * @param channelID The descriptor for the channel to write to.
+ * @param targetSegments The list to take the segments from into which to read the data.
+ * @param numBlocks The number of blocks in the channel to read.
+ * @return A block channel reader that reads from the given channel.
+ * @throws IOException Thrown, if the channel for the reader could not be opened.
+ */
+ @Override
+ public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
+ List<MemorySegment> targetSegments, int numBlocks) throws IOException
+ {
+ Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
+ }
+
+ // -------------------------------------------------------------------------
+ // I/O Worker Threads
+ // -------------------------------------------------------------------------
+
+ /**
+ * A worker thread for asynchronous reads.
+ */
+ private static final class ReaderThread extends Thread {
+
+ protected final RequestQueue<ReadRequest> requestQueue;
+
+ private volatile boolean alive;
+
+ // ---------------------------------------------------------------------
+ // Constructors / Destructors
+ // ---------------------------------------------------------------------
+
+ protected ReaderThread() {
+ this.requestQueue = new RequestQueue<ReadRequest>();
+ this.alive = true;
+ }
+
+ /**
+ * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+ * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
+ * reported to them, declaring their request queue as closed.
+ */
+ protected void shutdown() {
+ synchronized (this) {
+ if (alive) {
+ alive = false;
+ requestQueue.close();
+ interrupt();
+ }
+
+ try {
+ join(1000);
+ }
+ catch (InterruptedException e) {}
+
+ // notify all pending write requests that the thread has been shut down
+ IOException ioex = new IOException("IO-Manager has been closed.");
+
+ while (!this.requestQueue.isEmpty()) {
+ ReadRequest request = this.requestQueue.poll();
+ if (request != null) {
+ try {
+ request.requestDone(ioex);
+ }
+ catch (Throwable t) {
+ IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+ + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Main loop
+ // ---------------------------------------------------------------------
+
+ @Override
+ public void run() {
+
+ while (alive) {
+
+ // get the next buffer. ignore interrupts that are not due to a shutdown.
+ ReadRequest request = null;
+ while (alive && request == null) {
+ try {
+ request = this.requestQueue.take();
+ }
+ catch (InterruptedException e) {
+ if (!this.alive) {
+ return;
+ } else {
+ IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+ }
+ }
+ }
+
+ // remember any IO exception that occurs, so it can be reported to the writer
+ IOException ioex = null;
+
+ try {
+ // read buffer from the specified channel
+ request.read();
+ }
+ catch (IOException e) {
+ ioex = e;
+ }
+ catch (Throwable t) {
+ ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
+ IOManagerAsync.LOG.error("I/O reading thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+
+ // invoke the processed buffer handler of the request issuing reader object
+ try {
+ request.requestDone(ioex);
+ }
+ catch (Throwable t) {
+ IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+ } // end while alive
+ }
+
+ } // end reading thread
+
+ /**
+ * A worker thread that asynchronously writes the buffers to disk.
+ */
+ private static final class WriterThread extends Thread {
+
+ protected final RequestQueue<WriteRequest> requestQueue;
+
+ private volatile boolean alive;
+
+ // ---------------------------------------------------------------------
+ // Constructors / Destructors
+ // ---------------------------------------------------------------------
+
+ protected WriterThread() {
+ this.requestQueue = new RequestQueue<WriteRequest>();
+ this.alive = true;
+ }
+
+ /**
+ * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+ * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
+ * reported to them, declaring their request queue as closed.
+ */
+ protected void shutdown() {
+ synchronized (this) {
+ if (alive) {
+ alive = false;
+ requestQueue.close();
+ interrupt();
+ }
+
+ try {
+ join(1000);
+ }
+ catch (InterruptedException e) {}
+
+ // notify all pending write requests that the thread has been shut down
+ IOException ioex = new IOException("IO-Manager has been closed.");
+
+ while (!this.requestQueue.isEmpty()) {
+ WriteRequest request = this.requestQueue.poll();
+ if (request != null) {
+ try {
+ request.requestDone(ioex);
+ }
+ catch (Throwable t) {
+ IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+ + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Main loop
+ // ---------------------------------------------------------------------
+
+ @Override
+ public void run() {
+
+ while (this.alive) {
+
+ WriteRequest request = null;
+
+ // get the next buffer. ignore interrupts that are not due to a shutdown.
+ while (alive && request == null) {
+ try {
+ request = requestQueue.take();
+ }
+ catch (InterruptedException e) {
+ if (!this.alive) {
+ return;
+ } else {
+ IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+ }
+ }
+ }
+
+ // remember any IO exception that occurs, so it can be reported to the writer
+ IOException ioex = null;
+
+ try {
+ // write buffer to the specified channel
+ request.write();
+ }
+ catch (IOException e) {
+ ioex = e;
+ }
+ catch (Throwable t) {
+ ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
+ IOManagerAsync.LOG.error("I/O writing thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+
+ // invoke the processed buffer handler of the request issuing writer object
+ try {
+ request.requestDone(ioex);
+ }
+ catch (Throwable t) {
+ IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+ }
+ } // end while alive
+ }
+
+ }; // end writer thread
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index d24e8c7..1b7adae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -16,23 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
-
/**
* Basic interface that I/O requests that are sent to the threads of the I/O manager need to implement.
- *
*/
-interface IORequest
-{
+interface IORequest {
+
/**
* Method that is called by the target I/O thread after the request has been processed.
*
- * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything
- * was fine.
+ * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything was fine.
*/
public void requestDone(IOException ioex);
}
@@ -41,8 +37,8 @@ interface IORequest
/**
* Interface for I/O requests that are handled by the IOManager's reading thread.
*/
-interface ReadRequest extends IORequest
-{
+interface ReadRequest extends IORequest {
+
/**
* Called by the target I/O thread to perform the actual reading operation.
*
@@ -55,8 +51,8 @@ interface ReadRequest extends IORequest
/**
* Interface for I/O requests that are handled by the IOManager's writing thread.
*/
-interface WriteRequest extends IORequest
-{
+interface WriteRequest extends IORequest {
+
/**
* Called by the target I/O thread to perform the actual writing operation.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
new file mode 100644
index 0000000..78699e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
+ */
+public class QueuingCallback implements RequestDoneCallback {
+
+ private final LinkedBlockingQueue<MemorySegment> queue;
+
+ public QueuingCallback(LinkedBlockingQueue<MemorySegment> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void requestSuccessful(MemorySegment buffer) {
+ queue.add(buffer);
+ }
+
+ @Override
+ public void requestFailed(MemorySegment buffer, IOException e) {
+ // the I/O error is recorded in the writer already
+ queue.add(buffer);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
new file mode 100644
index 0000000..982343c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Callback to be executed on completion of an asynchronous I/O request.
+ * Depending on success or failure, either method of
+ * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)}
+ * is called.
+ */
+public interface RequestDoneCallback {
+
+ void requestSuccessful(MemorySegment buffer);
+
+ void requestFailed(MemorySegment buffer, IOException e);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
index b506380..230bf50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
@@ -16,29 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
-
import java.io.Closeable;
-import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-
/**
* A {@link LinkedBlockingQueue} that is extended with closing methods.
- *
*/
-public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable
-{
- /**
- * UID for serialization interoperability.
- */
+public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable {
+
private static final long serialVersionUID = 3804115535778471680L;
- /**
- * Flag marking this queue as closed.
- */
+ /** Flag marking this queue as closed. */
private volatile boolean closed = false;
/**
@@ -47,7 +37,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
* @see java.io.Closeable#close()
*/
@Override
- public void close() throws IOException {
+ public void close() {
this.closed = true;
}
@@ -56,9 +46,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
*
* @return True, if the queue is closed, false otherwise.
*/
- public boolean isClosed()
- {
+ public boolean isClosed() {
return this.closed;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
new file mode 100644
index 0000000..fd6c230
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+/**
+ * A base class for synchronous readers and writers.
+ */
+public abstract class SynchronousFileIOChannel extends AbstractFileIOChannel {
+
+ protected SynchronousFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+ super(channelID, writeEnabled);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isClosed() {
+ return !this.fileChannel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.fileChannel.isOpen()) {
+ this.fileChannel.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index b5504f3..a02e81c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
@@ -49,7 +49,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
private final IOManager ioManager;
- private final Channel.Enumerator channelEnumerator;
+ private final FileIOChannel.Enumerator channelEnumerator;
private final int numSegmentsSpillingThreshold;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 020da9d..6d3194b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.SeekableDataInputView;
import org.apache.flink.core.memory.SeekableDataOutputView;
import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
@@ -282,7 +282,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
* @return The number of buffers that were freed by spilling this partition.
* @throws IOException Thrown, if the writing failed.
*/
- public int spillPartition(List<MemorySegment> target, IOManager ioAccess, Channel.ID targetChannel,
+ public int spillPartition(List<MemorySegment> target, IOManager ioAccess, FileIOChannel.ID targetChannel,
LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
throws IOException
{
@@ -311,7 +311,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
return this.buildSideWriteBuffer.spill(this.buildSideChannel);
}
- public void finalizeBuildPhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+ public void finalizeBuildPhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
throws IOException
{
@@ -449,7 +449,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
// ReOpenableHashTable related methods
// --------------------------------------------------------------------------------------------------
- public void prepareProbePhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+ public void prepareProbePhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
if (isInMemory()) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 21bd8df..1bbf246 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -37,7 +37,7 @@ import org.apache.flink.core.memory.SeekableDataOutputView;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -306,7 +306,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* The channel enumerator that is used while processing the current partition to create
* channels for the spill partitions it requires.
*/
- protected Channel.Enumerator currentEnumerator;
+ protected FileIOChannel.Enumerator currentEnumerator;
/**
* The array of memory segments that contain the buckets which form the actual hash-table
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 3695aea..56dcfae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -28,14 +28,14 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
protected int initialPartitionBuffersCount = -1; // stores the number of buffers used for an in-memory partition after the build phase has finished.
- private Channel.ID initialBuildSideChannel = null; // path to initial build side contents (only for in-memory partitions)
+ private FileIOChannel.ID initialBuildSideChannel = null; // path to initial build side contents (only for in-memory partitions)
private BlockChannelWriter initialBuildSideWriter = null;
@@ -97,7 +97,7 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
*
* @return Number of memorySegments in the writeBehindBuffers!
*/
- int spillInMemoryPartition(Channel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
+ int spillInMemoryPartition(FileIOChannel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
this.initialPartitionBuffersCount = partitionBuffers.length; // for ReOpenableHashMap
this.initialBuildSideChannel = targetChannel;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 5099f38..6819924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.util.MutableObjectIterator;
@@ -36,7 +36,7 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
/**
* Channel for the spilled partitions
*/
- private final Channel.Enumerator spilledInMemoryPartitions;
+ private final FileIOChannel.Enumerator spilledInMemoryPartitions;
/**
* Stores the initial partitions and a list of the files that contain the spilled contents
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index e566e25..63e64c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
@@ -36,9 +35,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -267,7 +265,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
}
- final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
+ final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
@@ -296,7 +294,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
}
// open next channel
- Channel.ID channel = enumerator.next();
+ FileIOChannel.ID channel = enumerator.next();
registerChannelToBeRemovedAtShudown(channel);
if (LOG.isDebugEnabled()) {
@@ -304,8 +302,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
}
// create writer
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
- channel, this.numWriteBuffersToCluster);
+ final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
@@ -420,7 +417,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// get the readers and register them to be released
final MergeIterator<E> mergeIterator = getMergingIterator(
- channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size()));
+ channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()));
// set the target for the user iterator
// if the final merge combines, create a combining iterator around the merge iterator,
@@ -452,17 +449,16 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
throws IOException
{
// the list with the readers, to be closed at shutdown
- final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+ final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
// the list with the target iterators
final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
final KeyGroupedIterator<E> groupedIter = new KeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
// create a new channel writer
- final Channel.ID mergedChannelID = this.ioManager.createChannel();
+ final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
registerChannelToBeRemovedAtShudown(mergedChannelID);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
- mergedChannelID, this.numWriteBuffersToCluster);
+ final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers,
this.memManager.getPageSize());
@@ -488,7 +484,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// remove the merged channel readers from the clear-at-shutdown list
for (int i = 0; i < channelAccesses.size(); i++) {
- BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+ FileIOChannel access = channelAccesses.get(i);
access.closeAndDelete();
unregisterOpenChannelToBeRemovedAtShudown(access);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 3dd21f5..459ef82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -37,14 +37,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.Channel.ID;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -135,12 +134,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
/**
* Collection of all currently open channels, to be closed and deleted during cleanup.
*/
- private final HashSet<BlockChannelAccess<?, ?>> openChannels;
+ private final HashSet<FileIOChannel> openChannels;
/**
* Collection of all temporary files created and to be removed when closing the sorter.
*/
- private final HashSet<Channel.ID> channelsToDeleteAtShutdown;
+ private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown;
/**
* The monitor which guards the iterator field.
@@ -387,8 +386,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
};
// create sets that track the channels we need to clean up when closing the sorter
- this.channelsToDeleteAtShutdown = new HashSet<Channel.ID>(64);
- this.openChannels = new HashSet<BlockChannelAccess<?,?>>(64);
+ this.channelsToDeleteAtShutdown = new HashSet<FileIOChannel.ID>(64);
+ this.openChannels = new HashSet<FileIOChannel>(64);
// start the thread that reads the input channels
this.readThread = getReadingThread(exceptionHandler, input, circularQueues, parentTask,
@@ -519,8 +518,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// we have to loop this, because it may fail with a concurrent modification exception
while (!this.openChannels.isEmpty()) {
try {
- for (Iterator<BlockChannelAccess<?, ?>> channels = this.openChannels.iterator(); channels.hasNext(); ) {
- final BlockChannelAccess<?, ?> channel = channels.next();
+ for (Iterator<FileIOChannel> channels = this.openChannels.iterator(); channels.hasNext(); ) {
+ final FileIOChannel channel = channels.next();
channels.remove();
channel.closeAndDelete();
}
@@ -531,8 +530,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// we have to loop this, because it may fail with a concurrent modification exception
while (!this.channelsToDeleteAtShutdown.isEmpty()) {
try {
- for (Iterator<Channel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
- final Channel.ID channel = channels.next();
+ for (Iterator<FileIOChannel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
+ final FileIOChannel.ID channel = channels.next();
channels.remove();
try {
final File f = new File(channel.getPath());
@@ -1257,7 +1256,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// ------------------- Spilling Phase ------------------------
- final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
+ final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
@@ -1286,12 +1285,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
}
// open next channel
- Channel.ID channel = enumerator.next();
+ FileIOChannel.ID channel = enumerator.next();
registerChannelToBeRemovedAtShudown(channel);
// create writer
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
- channel, this.numWriteBuffersToCluster);
+ final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
@@ -1351,7 +1349,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
getSegmentsForReaders(readBuffers, this.sortReadMemory, channelIDs.size());
// get the readers and register them to be released
- setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size())));
+ setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size())));
}
// done
@@ -1405,7 +1403,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
* @throws IOException Thrown, if the readers encounter an I/O problem.
*/
protected final MergeIterator<E> getMergingIterator(final List<ChannelWithBlockCount> channelIDs,
- final List<List<MemorySegment>> inputSegments, List<BlockChannelAccess<?, ?>> readerList)
+ final List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList)
throws IOException
{
// create one iterator per channel id
@@ -1420,9 +1418,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
final List<MemorySegment> segsForChannel = inputSegments.get(i);
// create a reader. if there are multiple segments for the reader, issue multiple together per I/O request
- final BlockChannelReader reader = segsForChannel.size() >= 4 ?
- this.ioManager.createBlockChannelReader(channel.getChannel(), segsForChannel.size() / 2) :
- this.ioManager.createBlockChannelReader(channel.getChannel());
+ final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel.getChannel());
readerList.add(reader);
registerOpenChannelToBeRemovedAtShudown(reader);
@@ -1495,16 +1491,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
throws IOException
{
// the list with the readers, to be closed at shutdown
- final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+ final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
// the list with the target iterators
final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
// create a new channel writer
- final Channel.ID mergedChannelID = this.ioManager.createChannel();
+ final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
registerChannelToBeRemovedAtShudown(mergedChannelID);
- final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
- mergedChannelID, this.numWriteBuffersToCluster);
+ final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers,
this.memManager.getPageSize());
@@ -1523,7 +1518,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// remove the merged channel readers from the clear-at-shutdown list
for (int i = 0; i < channelAccesses.size(); i++) {
- BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+ FileIOChannel access = channelAccesses.get(i);
access.closeAndDelete();
unregisterOpenChannelToBeRemovedAtShudown(access);
}
@@ -1577,7 +1572,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
*
* @param channel The channel id.
*/
- protected void registerChannelToBeRemovedAtShudown(Channel.ID channel) {
+ protected void registerChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
}
@@ -1586,7 +1581,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
*
* @param channel The channel id.
*/
- protected void unregisterChannelToBeRemovedAtShudown(Channel.ID channel) {
+ protected void unregisterChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
}
@@ -1595,7 +1590,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
*
* @param channel The channel reader/writer.
*/
- protected void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+ protected void registerOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
UnilateralSortMerger.this.openChannels.add(channel);
}
@@ -1604,7 +1599,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
*
* @param channel The channel reader/writer.
*/
- protected void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+ protected void unregisterOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
UnilateralSortMerger.this.openChannels.remove(channel);
}
}
@@ -1769,7 +1764,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
protected static final class ChannelWithBlockCount
{
- private final Channel.ID channel;
+ private final FileIOChannel.ID channel;
private final int blockCount;
public ChannelWithBlockCount(ID channel, int blockCount) {
@@ -1777,7 +1772,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
this.blockCount = blockCount;
}
- public Channel.ID getChannel() {
+ public FileIOChannel.ID getChannel() {
return channel;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 7220b23..93bece2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
@@ -354,7 +355,7 @@ public class TaskManager implements TaskOperationProtocol {
(blobServerAddress), GlobalConfiguration.getConfiguration());
}
}
- this.ioManager = new IOManager(tmpDirPaths);
+ this.ioManager = new IOManagerAsync(tmpDirPaths);
// start the heart beats
{
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 72fe3f1..535c756 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -27,6 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
+/**
+ * Utility class that gives access to the execution environment of the JVM, like
+ * the executing user, startup options, or the JVM version.
+ */
public class EnvironmentInformation {
private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class);
@@ -183,6 +187,10 @@ public class EnvironmentInformation {
return UNKNOWN;
}
}
+
+ public static String getTemporaryFileDirectory() {
+ return System.getProperty("java.io.tmpdir");
+ }
public static void logEnvironmentInfo(Logger log, String componentName) {
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 879bd37..b4070ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -55,7 +55,7 @@ public final class BlobKeyTest {
}
/**
- * Tests the serialization/deserialization of BLOB keys using the regular {@link IOReadableWritable} API.
+ * Tests the serialization/deserialization of BLOB keys using the regular {@link org.apache.flink.core.io.IOReadableWritable} API.
*/
@Test
public void testSerialization() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 2073061..c05fcca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -23,14 +23,14 @@ import java.io.EOFException;
import java.util.List;
import org.junit.Assert;
-
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -79,7 +79,7 @@ public class ChannelViewsTest
@Before
public void beforeTest() {
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
- this.ioManager = new IOManager();
+ this.ioManager = new IOManagerAsync();
}
@After
@@ -103,7 +103,7 @@ public class ChannelViewsTest
public void testWriteReadSmallRecords() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -147,7 +147,7 @@ public class ChannelViewsTest
public void testWriteAndReadLongRecords() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -188,7 +188,7 @@ public class ChannelViewsTest
public void testReadTooMany() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -239,7 +239,7 @@ public class ChannelViewsTest
public void testReadWithoutKnownBlockCount() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -283,7 +283,7 @@ public class ChannelViewsTest
public void testWriteReadOneBufferOnly() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -327,7 +327,7 @@ public class ChannelViewsTest
public void testWriteReadNotAll() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final Channel.ID channel = this.ioManager.createChannel();
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 629d1dc..22c40f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -23,11 +23,11 @@ import java.io.EOFException;
import java.util.ArrayList;
import org.junit.Assert;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
@@ -71,7 +71,7 @@ public class SpillingBufferTest {
@Before
public void beforeTest() {
memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
- ioManager = new IOManager();
+ ioManager = new IOManagerAsync();
}
@After
[3/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager
Readers and Writers to interfaces,
add implementation that uses callbacks on completed write requests.
Posted by uc...@apache.org.
[FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.
- This change also allows for a very simple way of plugging in a synchronous version of the I/O manager.
This closes #193.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9cfe3ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9cfe3ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9cfe3ba
Branch: refs/heads/master
Commit: c9cfe3ba9a2009c3da2cb8a39090154c30ccd88c
Parents: 8e4c772
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 7 18:45:19 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Tue Nov 11 11:48:14 2014 +0100
----------------------------------------------------------------------
.../io/disk/ChannelReaderInputViewIterator.java | 6 +-
.../disk/iomanager/AbstractFileIOChannel.java | 106 ++++
.../disk/iomanager/AsynchronousBlockReader.java | 131 +++++
.../disk/iomanager/AsynchronousBlockWriter.java | 88 +++
.../AsynchronousBlockWriterWithCallback.java | 67 +++
.../iomanager/AsynchronousBulkBlockReader.java | 107 ++++
.../iomanager/AsynchronousFileIOChannel.java | 264 +++++++++
.../io/disk/iomanager/BlockChannelAccess.java | 272 ---------
.../io/disk/iomanager/BlockChannelReader.java | 87 +--
.../io/disk/iomanager/BlockChannelWriter.java | 89 +--
.../BlockChannelWriterWithCallback.java | 35 ++
.../disk/iomanager/BulkBlockChannelReader.java | 59 +-
.../runtime/io/disk/iomanager/Channel.java | 109 ----
.../io/disk/iomanager/ChannelAccess.java | 172 ------
.../disk/iomanager/ChannelReaderInputView.java | 3 +-
.../disk/iomanager/ChannelWriterOutputView.java | 3 +-
.../io/disk/iomanager/FileIOChannel.java | 156 ++++++
.../runtime/io/disk/iomanager/IOManager.java | 547 ++-----------------
.../io/disk/iomanager/IOManagerAsync.java | 449 +++++++++++++++
.../runtime/io/disk/iomanager/IORequest.java | 18 +-
.../io/disk/iomanager/QueuingCallback.java | 47 ++
.../io/disk/iomanager/RequestDoneCallback.java | 36 ++
.../runtime/io/disk/iomanager/RequestQueue.java | 22 +-
.../iomanager/SynchronousFileIOChannel.java | 45 ++
.../iterative/io/SerializedUpdateBuffer.java | 4 +-
.../runtime/operators/hash/HashPartition.java | 8 +-
.../operators/hash/MutableHashTable.java | 4 +-
.../operators/hash/ReOpenableHashPartition.java | 6 +-
.../hash/ReOpenableMutableHashTable.java | 4 +-
.../sort/CombiningUnilateralSortMerger.java | 22 +-
.../operators/sort/UnilateralSortMerger.java | 57 +-
.../flink/runtime/taskmanager/TaskManager.java | 3 +-
.../runtime/util/EnvironmentInformation.java | 8 +
.../apache/flink/runtime/blob/BlobKeyTest.java | 2 +-
.../flink/runtime/io/disk/ChannelViewsTest.java | 18 +-
.../runtime/io/disk/SpillingBufferTest.java | 4 +-
.../io/disk/iomanager/IOManagerITCase.java | 4 +-
.../IOManagerPerformanceBenchmark.java | 17 +-
.../io/disk/iomanager/IOManagerTest.java | 65 +--
.../flink/runtime/operators/CrossTaskTest.java | 7 +-
.../operators/MatchTaskExternalITCase.java | 5 +-
.../flink/runtime/operators/MatchTaskTest.java | 5 +-
.../operators/hash/HashMatchIteratorITCase.java | 8 +-
.../runtime/operators/hash/HashTableITCase.java | 9 +-
.../hash/HashTablePerformanceComparison.java | 3 +-
.../hash/ReOpenableHashTableITCase.java | 5 +-
.../SpillingResettableIteratorTest.java | 3 +-
...lingResettableMutableObjectIteratorTest.java | 3 +-
.../CombiningUnilateralSortMergerITCase.java | 3 +-
.../operators/sort/ExternalSortITCase.java | 3 +-
.../sort/MassiveStringSortingITCase.java | 5 +-
.../sort/SortMergeMatchIteratorITCase.java | 11 +-
.../operators/testutils/DriverTestBase.java | 3 +-
.../operators/testutils/MockEnvironment.java | 3 +-
.../operators/util/HashVsSortMiniBenchmark.java | 5 +-
55 files changed, 1776 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 7eaf635..f38aa25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -27,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.util.MutableObjectIterator;
@@ -46,14 +46,14 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
private final List<MemorySegment> freeMemTarget;
- public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, List<MemorySegment> segments,
+ public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, List<MemorySegment> segments,
List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
throws IOException
{
this(ioAccess, channel, new LinkedBlockingQueue<MemorySegment>(), segments, freeMemTarget, accessors, numBlocks);
}
- public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, LinkedBlockingQueue<MemorySegment> returnQueue,
+ public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, LinkedBlockingQueue<MemorySegment> returnQueue,
List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
new file mode 100644
index 0000000..ecb794e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFileIOChannel implements FileIOChannel {
+
+ /** Logger object for channel and its subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(FileIOChannel.class);
+
+ /** The ID of the underlying channel. */
+ protected final FileIOChannel.ID id;
+
+ /** A file channel for NIO access to the file. */
+ protected final FileChannel fileChannel;
+
+
+ /**
+ * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
+ * the given request queue to be processed.
+ *
+ * @param channelID The id describing the path of the file that the channel accessed.
+ * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+ * than in read-only mode.
+ * @throws IOException Thrown, if the channel could no be opened.
+ */
+ protected AbstractFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+ this.id = Preconditions.checkNotNull(channelID);
+
+ try {
+ @SuppressWarnings("resource")
+ RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
+ this.fileChannel = file.getChannel();
+ }
+ catch (IOException e) {
+ throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the channel ID of this channel.
+ *
+ * @return This channel's ID.
+ */
+ @Override
+ public final FileIOChannel.ID getChannelID() {
+ return this.id;
+ }
+
+ @Override
+ public abstract boolean isClosed();
+
+ @Override
+ public abstract void close() throws IOException;
+
+ @Override
+ public void deleteChannel() {
+ if (!isClosed() || this.fileChannel.isOpen()) {
+ throw new IllegalStateException("Cannot delete a channel that is open.");
+ }
+
+ // make a best effort to delete the file. Don't report exceptions.
+ try {
+ File f = new File(this.id.getPath());
+ if (f.exists()) {
+ f.delete();
+ }
+ } catch (Throwable t) {}
+ }
+
+ @Override
+ public void closeAndDelete() throws IOException {
+ try {
+ close();
+ } finally {
+ deleteChannel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
new file mode 100644
index 0000000..35273f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A reader that reads data in blocks from a file channel. The reader reads the blocks into a
+ * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
+ * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
+ * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
+ * worker thread, once it needs the data. The return queue is in this case a
+ * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
+ * if the request is still pending when the it requires the data.
+ * <p>
+ * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
+ * is actually needed.
+ * <p>
+ * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
+ * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
+ * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.
+ */
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BlockChannelReader {
+
+ private final LinkedBlockingQueue<MemorySegment> returnSegments;
+
+ /**
+ * Creates a new block channel reader for the given channel.
+ *
+ * @param channelID The ID of the channel to read.
+ * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
+ * are added.
+ * @param returnSegments The return queue, to which the full Memory Segments are added.
+ * @throws IOException Thrown, if the underlying file channel could not be opened.
+ */
+ protected AsynchronousBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ LinkedBlockingQueue<MemorySegment> returnSegments)
+ throws IOException
+ {
+ super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+ this.returnSegments = returnSegments;
+ }
+
+ /**
+ * Issues a read request, which will asynchronously fill the given segment with the next block in the
+ * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
+ * return queue.
+ *
+ * @param segment The segment to read the block into.
+ * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
+ * reader, the exception thrown here may have been caused by an earlier read request.
+ */
+ @Override
+ public void readBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ // the statements have to be in this order to avoid incrementing the counter
+ // after the channel has been closed
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The reader has been closed.");
+ }
+ this.requestQueue.add(new SegmentReadRequest(this, segment));
+ }
+
+ /**
+ * Gets the next memory segment that has been filled with data by the reader. This method blocks until
+ * such a segment is available, or until an error occurs in the reader, or the reader is closed.
+ * <p>
+ * WARNING: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+ * forever.
+ *
+ * @return The next memory segment from the reader's return queue.
+ * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
+ */
+ @Override
+ public MemorySegment getNextReturnedSegment() throws IOException {
+ try {
+ while (true) {
+ final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ return next;
+ } else {
+ if (this.closed) {
+ throw new IOException("The reader has been asynchronously closed.");
+ }
+ checkErroneous();
+ }
+ }
+ } catch (InterruptedException iex) {
+ throw new IOException("Reader was interrupted while waiting for the next returning segment.");
+ }
+ }
+
+ /**
+ * Gets the queue in which the full memory segments are queued after the asynchronous read
+ * is complete.
+ *
+ * @return The queue with the full memory segments.
+ */
+ @Override
+ public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+ return this.returnSegments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
new file mode 100644
index 0000000..7e1681f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+
+ private final LinkedBlockingQueue<MemorySegment> returnSegments;
+
+ /**
+ * Creates a new block channel writer for the given channel.
+ *
+ * @param channelID The ID of the channel to write to.
+ * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
+ * are added.
+ * @param returnSegments The return queue, to which the processed Memory Segments are added.
+ * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+ */
+ protected AsynchronousBlockWriter(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+ LinkedBlockingQueue<MemorySegment> returnSegments)
+ throws IOException
+ {
+ super(channelID, requestQueue, new QueuingCallback(returnSegments));
+ this.returnSegments = returnSegments;
+ }
+
+ /**
+ * Gets the next memory segment that has been written and is available again.
+ * This method blocks until such a segment is available, or until an error occurs in the writer, or the
+ * writer is closed.
+ * <p>
+ * NOTE: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+ * forever.
+ *
+ * @return The next memory segment from the writers's return queue.
+ * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
+ */
+ @Override
+ public MemorySegment getNextReturnedSegment() throws IOException {
+ try {
+ while (true) {
+ final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ return next;
+ } else {
+ if (this.closed) {
+ throw new IOException("The writer has been closed.");
+ }
+ checkErroneous();
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Writer was interrupted while waiting for the next returning segment.");
+ }
+ }
+
+ /**
+ * Gets the queue in which the memory segments are queued after the asynchronous write is completed.
+ *
+ * @return The queue with the written memory segments.
+ */
+ @Override
+ public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+ return this.returnSegments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
new file mode 100644
index 0000000..6b6fb36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
+ * and calls a callback once they have been handled.
+ */
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<WriteRequest> implements BlockChannelWriterWithCallback {
+
+ /**
+ * Creates a new asynchronous block writer for the given channel.
+ *
+ * @param channelID The ID of the channel to write to.
+ * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests are added.
+ * @param callback The callback to be invoked when requests are done.
+ * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+ */
+ protected AsynchronousBlockWriterWithCallback(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+ RequestDoneCallback callback) throws IOException
+ {
+ super(channelID, requestQueue, callback, true);
+ }
+
+ /**
+ * Issues a asynchronous write request to the writer.
+ *
+ * @param segment The segment to be written.
+ * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
+ * writer, the exception thrown here may have been caused by an earlier write request.
+ */
+ @Override
+ public void writeBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The writer has been closed.");
+ }
+ this.requestQueue.add(new SegmentWriteRequest(this, segment));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
new file mode 100644
index 0000000..048f82f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ *
+ */
+public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BulkBlockChannelReader {
+
+ private final ArrayList<MemorySegment> returnBuffers;
+
+
+ protected AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ List<MemorySegment> sourceSegments, int numBlocks)
+ throws IOException
+ {
+ this (channelID, requestQueue, sourceSegments, numBlocks, new ArrayList<MemorySegment>(numBlocks));
+ }
+
+ private AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+ List<MemorySegment> sourceSegments, int numBlocks, ArrayList<MemorySegment> target)
+ throws IOException
+ {
+ super(channelID, requestQueue, new CollectingCallback(target), false);
+ this.returnBuffers = target;
+
+ // sanity check
+ if (sourceSegments.size() < numBlocks) {
+ throw new IllegalArgumentException("The list of source memory segments must contain at least" +
+ " as many segments as the number of blocks to read.");
+ }
+
+ // send read requests for all blocks
+ for (int i = 0; i < numBlocks; i++) {
+ readBlock(sourceSegments.remove(sourceSegments.size() - 1));
+ }
+ }
+
+ private void readBlock(MemorySegment segment) throws IOException {
+ // check the error state of this channel
+ checkErroneous();
+
+ // write the current buffer and get the next one
+ this.requestsNotReturned.incrementAndGet();
+ if (this.closed || this.requestQueue.isClosed()) {
+ // if we found ourselves closed after the counter increment,
+ // decrement the counter again and do not forward the request
+ this.requestsNotReturned.decrementAndGet();
+ throw new IOException("The reader has been closed.");
+ }
+ this.requestQueue.add(new SegmentReadRequest(this, segment));
+ }
+
+ @Override
+ public List<MemorySegment> getFullSegments() {
+ synchronized (this.closeLock) {
+ if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
+ throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
+ }
+ }
+
+ return this.returnBuffers;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class CollectingCallback implements RequestDoneCallback {
+
+ private final ArrayList<MemorySegment> list;
+
+ public CollectingCallback(ArrayList<MemorySegment> list) {
+ this.list = list;
+ }
+
+ @Override
+ public void requestSuccessful(MemorySegment buffer) {
+ list.add(buffer);
+ }
+
+ @Override
+ public void requestFailed(MemorySegment buffer, IOException e) {
+ list.add(buffer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
new file mode 100644
index 0000000..098b334
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A base class for readers and writers that accept read or write requests for whole blocks.
+ * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
+ * segment of the block is added to a collection to be returned.
+ * <p>
+ * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
+ *
+ * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
+ */
+public abstract class AsynchronousFileIOChannel<R extends IORequest> extends AbstractFileIOChannel {
+
+ /** The lock that is used during closing to synchronize the thread that waits for all
+ * requests to be handled with the asynchronous I/O thread. */
+ protected final Object closeLock = new Object();
+
+ /** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
+ protected final RequestQueue<R> requestQueue;
+
+ /** An atomic integer that counts the number of requests that we still wait for to return. */
+ protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
+
+ /** Hander for completed requests */
+ protected final RequestDoneCallback resultHander;
+
+ /** An exception that was encountered by the asynchronous request handling thread.*/
+ protected volatile IOException exception;
+
+ /** Flag marking this channel as closed */
+ protected volatile boolean closed;
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
+ * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
+ * are returned by adding the to the given queue.
+ *
+ * @param channelID The id describing the path of the file that the channel accessed.
+ * @param requestQueue The queue that this channel hands its IO requests to.
+ * @param callback The callback to be invoked when a request is done.
+ * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+ * than in read-only mode.
+ * @throws IOException Thrown, if the channel could no be opened.
+ */
+ protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue,
+ RequestDoneCallback callback, boolean writeEnabled) throws IOException
+ {
+ super(channelID, writeEnabled);
+
+ if (requestQueue == null) {
+ throw new NullPointerException();
+ }
+
+ this.requestQueue = requestQueue;
+ this.resultHander = callback;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ /**
+ * Closes the reader and waits until all pending asynchronous requests are
+ * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+ *
+ * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
+ * the closing was interrupted.
+ */
+ public void close() throws IOException {
+ // atomically set the close flag
+ synchronized (this.closeLock) {
+ if (this.closed) {
+ return;
+ }
+ this.closed = true;
+
+ try {
+ // wait until as many buffers have been returned as were written
+ // only then is everything guaranteed to be consistent.{
+ while (this.requestsNotReturned.get() > 0) {
+ try {
+ // we add a timeout here, because it is not guaranteed that the
+ // decrementing during buffer return and the check here are deadlock free.
+ // the deadlock situation is however unlikely and caught by the timeout
+ this.closeLock.wait(1000);
+ checkErroneous();
+ }
+ catch (InterruptedException iex) {}
+ }
+ }
+ finally {
+ // close the file
+ if (this.fileChannel.isOpen()) {
+ this.fileChannel.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method waits for all pending asynchronous requests to return. When the
+ * last request has returned, the channel is closed and deleted.
+ * <p>
+ * Even if an exception interrupts the closing, such that not all request are handled,
+ * the underlying <tt>FileChannel</tt> is closed and deleted.
+ *
+ * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
+ */
+ public void closeAndDelete() throws IOException {
+ try {
+ close();
+ }
+ finally {
+ deleteChannel();
+ }
+ }
+
+ /**
+ * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
+ * be processed correctly.
+ *
+ * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
+ * that defined the erroneous state as its cause.
+ */
+ public final void checkErroneous() throws IOException {
+ if (this.exception != null) {
+ throw this.exception;
+ }
+ }
+
+ /**
+ * Handles a processed <tt>Buffer</tt>. This method is invoked by the
+ * asynchronous IO worker threads upon completion of the IO request with the
+ * provided buffer and/or an exception that occurred while processing the request
+ * for that buffer.
+ *
+ * @param buffer The buffer to be processed.
+ * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+ */
+ final void handleProcessedBuffer(MemorySegment buffer, IOException ex) {
+ // even if the callbacks throw an error, we need to maintain our bookkeeping
+ try {
+ if (ex != null && this.exception == null) {
+ this.exception = ex;
+ this.resultHander.requestFailed(buffer, ex);
+ }
+ else {
+ this.resultHander.requestSuccessful(buffer);
+ }
+ }
+ finally {
+ // decrement the number of missing buffers. If we are currently closing, notify the
+ if (this.closed) {
+ synchronized (this.closeLock) {
+ int num = this.requestsNotReturned.decrementAndGet();
+ if (num == 0) {
+ this.closeLock.notifyAll();
+ }
+ }
+ }
+ else {
+ this.requestsNotReturned.decrementAndGet();
+ }
+ }
+ }
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Read request that reads an entire memory segment from a block reader.
+ */
+final class SegmentReadRequest implements ReadRequest {
+
+ private final AsynchronousFileIOChannel<ReadRequest> channel;
+
+ private final MemorySegment segment;
+
+ protected SegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
+ this.channel = targetChannel;
+ this.segment = segment;
+ }
+
+ @Override
+ public void read() throws IOException {
+ final FileChannel c = this.channel.fileChannel;
+ if (c.size() - c.position() > 0) {
+ try {
+ final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
+ this.channel.fileChannel.read(wrapper);
+ }
+ catch (NullPointerException npex) {
+ throw new IOException("Memory segment has been released.");
+ }
+ }
+ }
+
+ @Override
+ public void requestDone(IOException ioex) {
+ this.channel.handleProcessedBuffer(this.segment, ioex);
+ }
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Write request that writes an entire memory segment to the block writer.
+ */
+final class SegmentWriteRequest implements WriteRequest {
+
+ private final AsynchronousFileIOChannel<WriteRequest> channel;
+
+ private final MemorySegment segment;
+
+ protected SegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+ this.channel = targetChannel;
+ this.segment = segment;
+ }
+
+ @Override
+ public void write() throws IOException {
+ try {
+ this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
+ }
+ catch (NullPointerException npex) {
+ throw new IOException("Memory segment has been released.");
+ }
+ }
+
+ @Override
+ public void requestDone(IOException ioex) {
+ this.channel.handleProcessedBuffer(this.segment, ioex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
deleted file mode 100644
index f19586d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-
-/**
- * A base class for readers and writers that accept read or write requests for whole blocks.
- * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
- * segment of the block is added to a collection to be returned.
- * <p>
- * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
- *
- *
- * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to
- * the I/O threads.
- * @param <C> The type of collection used to collect the segments from completed requests. Those segments are for
- * example for write requests the written and reusable segments, and for read requests the now full
- * and usable segments. The collection type may for example be a synchronized queue or an unsynchronized
- * list.
- */
-public abstract class BlockChannelAccess<R extends IORequest, C extends Collection<MemorySegment>> extends ChannelAccess<MemorySegment, R>
-{
- /**
- * The lock that is used during closing to synchronize the thread that waits for all
- * requests to be handled with the asynchronous I/O thread.
- */
- protected final Object closeLock = new Object();
-
- /**
- * An atomic integer that counts the number of buffers we still wait for to return.
- */
- protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
-
- /**
- * The collection gathering the processed buffers that are ready to be (re)used.
- */
- protected final C returnBuffers;
-
- /**
- * Flag marking this channel as closed;
- */
- protected volatile boolean closed;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
- * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
- * are returned by adding the to the given queue.
- *
- * @param channelID The id describing the path of the file that the channel accessed.
- * @param requestQueue The queue that this channel hands its IO requests to.
- * @param returnQueue The queue to which the segments are added after their buffer was written.
- * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
- * than in read-only mode.
- * @throws IOException Thrown, if the channel could no be opened.
- */
- protected BlockChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue,
- C returnQueue, boolean writeEnabled)
- throws IOException
- {
- super(channelID, requestQueue, writeEnabled);
-
- if (requestQueue == null) {
- throw new NullPointerException();
- }
-
- this.returnBuffers = returnQueue;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the queue (or list) to which the asynchronous reader adds its elements.
- *
- * @return The queue (or list) to which the asynchronous reader adds its elements.
- */
- public C getReturnQueue()
- {
- return this.returnBuffers;
- }
-
-
- @Override
- public boolean isClosed()
- {
- return this.closed;
- }
-
- /**
- * Closes the reader and waits until all pending asynchronous requests are
- * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is
- * closed.
- *
- * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
- * the closing was interrupted.
- */
- public void close() throws IOException
- {
- // atomically set the close flag
- synchronized (this.closeLock) {
- if (this.closed) {
- return;
- }
- this.closed = true;
-
- try {
- // wait until as many buffers have been returned as were written
- // only then is everything guaranteed to be consistent.{
- while (this.requestsNotReturned.get() > 0) {
- try {
- // we add a timeout here, because it is not guaranteed that the
- // decrementing during buffer return and the check here are deadlock free.
- // the deadlock situation is however unlikely and caught by the timeout
- this.closeLock.wait(1000);
- checkErroneous();
- }
- catch (InterruptedException iex) {}
- }
- }
- finally {
- // close the file
- if (this.fileChannel.isOpen()) {
- this.fileChannel.close();
- }
- }
- }
- }
-
- /**
- * This method waits for all pending asynchronous requests to return. When the
- * last request has returned, the channel is closed and deleted.
- *
- * Even if an exception interrupts the closing, such that not all request are handled,
- * the underlying <tt>FileChannel</tt> is closed and deleted.
- *
- * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
- * the closing was interrupted.
- */
- public void closeAndDelete() throws IOException
- {
- try {
- close();
- }
- finally {
- deleteChannel();
- }
- }
-
-
- @Override
- protected void returnBuffer(MemorySegment buffer)
- {
- this.returnBuffers.add(buffer);
-
- // decrement the number of missing buffers. If we are currently closing, notify the
- if (this.closed) {
- synchronized (this.closeLock) {
- int num = this.requestsNotReturned.decrementAndGet();
- if (num == 0) {
- this.closeLock.notifyAll();
- }
- }
- }
- else {
- this.requestsNotReturned.decrementAndGet();
- }
- }
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special read request that reads an entire memory segment from a block reader.
- */
-final class SegmentReadRequest implements ReadRequest
-{
- private final BlockChannelAccess<ReadRequest, ?> channel;
-
- private final MemorySegment segment;
-
- protected SegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
- {
- this.channel = targetChannel;
- this.segment = segment;
- }
-
-
- @Override
- public void read() throws IOException
- {
- final FileChannel c = this.channel.fileChannel;
- if (c.size() - c.position() > 0) {
- try {
- final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
- this.channel.fileChannel.read(wrapper);
- } catch (NullPointerException npex) {
- // the memory has been cleared asynchronouosly through task failing or canceling
- // ignore the request, since the result cannot be read
- }
- }
- }
-
-
- @Override
- public void requestDone(IOException ioex)
- {
- this.channel.handleProcessedBuffer(this.segment, ioex);
- }
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special write request that writes an entire memory segment to the block writer.
- */
-final class SegmentWriteRequest implements WriteRequest
-{
- private final BlockChannelAccess<WriteRequest, ?> channel;
-
- private final MemorySegment segment;
-
- protected SegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
- {
- this.channel = targetChannel;
- this.segment = segment;
- }
-
-
- @Override
- public void write() throws IOException
- {
- try {
- this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
- } catch (NullPointerException npex) {
- // the memory has been cleared asynchronouosly through task failing or canceling
- // ignore the request, since there is nothing to write.
- }
- }
-
-
- @Override
- public void requestDone(IOException ioex)
- {
- this.channel.handleProcessedBuffer(this.segment, ioex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f674ad4..f25827a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -16,76 +16,30 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
-
/**
* A reader that reads data in blocks from a file channel. The reader reads the blocks into a
- * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
- * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
- * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, once it needs the data. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the data.
- * <p>
- * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
- * is actually needed.
- * <p>
- * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
- * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
- * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.
+ * {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
+ * the read method does not immediately return the full memory segment, but rather adds it to
+ * a blocking queue of finished read operations.
*/
-public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBlockingQueue<MemorySegment>>
-{
- /**
- * Creates a new block channel reader for the given channel.
- *
- * @param channelID The ID of the channel to read.
- * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
- * are added.
- * @param returnSegments The return queue, to which the full Memory Segments are added.
- * @throws IOException Thrown, if the underlying file channel could not be opened.
- */
- protected BlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
- LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
- throws IOException
- {
- super(channelID, requestQueue, returnSegments, false);
- }
+public interface BlockChannelReader extends FileIOChannel {
/**
- * Issues a read request, which will asynchronously fill the given segment with the next block in the
+ * Issues a read request, which will fill the given segment with the next block in the
* underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
* return queue.
*
* @param segment The segment to read the block into.
- * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
- * reader, the exception thrown here may have been caused by an earlier read request.
+ * @throws IOException Thrown, when the reader encounters an I/O error.
*/
- public void readBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- // the statements have to be in this order to avoid incrementing the counter
- // after the channel has been closed
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The reader has been closed.");
- }
- this.requestQueue.add(new SegmentReadRequest(this, segment));
- }
+ void readBlock(MemorySegment segment) throws IOException;
/**
* Gets the next memory segment that has been filled with data by the reader. This method blocks until
@@ -98,22 +52,13 @@ public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBl
* @return The next memory segment from the reader's return queue.
* @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
*/
- public MemorySegment getNextReturnedSegment() throws IOException
- {
- try {
- while (true) {
- final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
- if (next != null) {
- return next;
- } else {
- if (this.closed) {
- throw new IOException("The reader has been asynchronously closed.");
- }
- checkErroneous();
- }
- }
- } catch (InterruptedException iex) {
- throw new IOException("Reader was interrupted while waiting for the next returning segment.");
- }
- }
+ public MemorySegment getNextReturnedSegment() throws IOException;
+
+ /**
+ * Gets the queue in which the full memory segments are queued after the read is complete.
+ *
+ * @return The queue with the full memory segments.
+ */
+ LinkedBlockingQueue<MemorySegment> getReturnQueue();
}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 44a2edb..25c74e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -16,101 +16,40 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
-
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
-
/**
* A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of
* {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
- * regardless of how space in the segment is used. The writing happens in an asynchronous fashion. That is, a write
- * request is not processed by the thread that issues it, but by an asynchronous writer thread. Once the request
- * is done, the asynchronous writer adds the MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, to be reused. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the segment back.
- * <p>
- * Typical write behind is realized, by having a small set of segments in the return queue at all times. When a
- * memory segment must be written, the request is issued to the writer and a new segment is immediately popped from
- * the return queue. Once too many requests have been issued and the I/O thread cannot keep up, the working thread
- * naturally blocks until another segment is available again.
+ * regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
+ * depending on the implementation.
*/
-public class BlockChannelWriter extends BlockChannelAccess<WriteRequest, LinkedBlockingQueue<MemorySegment>>
-{
- /**
- * Creates a new block channel writer for the given channel.
- *
- * @param channelID The ID of the channel to write to.
- * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
- * are added.
- * @param returnSegments The return queue, to which the processed Memory Segments are added.
- * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
- */
- protected BlockChannelWriter(Channel.ID channelID, RequestQueue<WriteRequest> requestQueue,
- LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
- throws IOException
- {
- super(channelID, requestQueue, returnSegments, true);
- }
-
- /**
- * Issues a asynchronous write request to the writer.
- *
- * @param segment The segment to be written.
- * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
- * writer, the exception thrown here may have been caused by an earlier write request.
- */
- public void writeBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The writer has been closed.");
- }
- this.requestQueue.add(new SegmentWriteRequest(this, segment));
- }
+public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
/**
* Gets the next memory segment that has been written and is available again.
* This method blocks until such a segment is available, or until an error occurs in the writer, or the
* writer is closed.
* <p>
- * WARNING: If this method is invoked without any segment ever returning (for example, because the
- * {@link #writeBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+ * NOTE: If this method is invoked without any segment ever returning (for example, because the
+ * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
* forever.
*
* @return The next memory segment from the writers's return queue.
* @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
*/
- public MemorySegment getNextReturnedSegment() throws IOException
- {
- try {
- while (true) {
- final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
- if (next != null) {
- return next;
- } else {
- if (this.closed) {
- throw new IOException("The writer has been closed.");
- }
- checkErroneous();
- }
- }
- } catch (InterruptedException iex) {
- throw new IOException("Writer was interrupted while waiting for the next returning segment.");
- }
- }
+ MemorySegment getNextReturnedSegment() throws IOException;
+
+ /**
+ * Gets the queue in which the memory segments are queued after the asynchronous write
+ * is completed
+ *
+ * @return The queue with the written memory segments.
+ */
+ LinkedBlockingQueue<MemorySegment> getReturnQueue();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
new file mode 100644
index 0000000..57bc7e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public interface BlockChannelWriterWithCallback extends FileIOChannel {
+
+ /**
+ * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+ * on the implementation.
+ *
+ * @param segment The segment to be written.
+ * @throws IOException Thrown, when the writer encounters an I/O error.
+ */
+ void writeBlock(MemorySegment segment) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
index 3be85d1..84883e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
@@ -16,71 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
-
/**
*
- *
*/
-public class BulkBlockChannelReader extends BlockChannelAccess<ReadRequest, ArrayList<MemorySegment>>
-{
+public interface BulkBlockChannelReader extends FileIOChannel {
-
- protected BulkBlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
- List<MemorySegment> sourceSegments, int numBlocks)
- throws IOException
- {
- super(channelID, requestQueue, new ArrayList<MemorySegment>(numBlocks), false);
-
- // sanity check
- if (sourceSegments.size() < numBlocks) {
- throw new IllegalArgumentException("The list of source memory segments must contain at least" +
- " as many segments as the number of blocks to read.");
- }
-
- // send read requests for all blocks
- for (int i = 0; i < numBlocks; i++) {
- readBlock(sourceSegments.remove(sourceSegments.size() - 1));
- }
- }
-
-
-
- private void readBlock(MemorySegment segment) throws IOException
- {
- // check the error state of this channel
- checkErroneous();
-
- // write the current buffer and get the next one
- this.requestsNotReturned.incrementAndGet();
- if (this.closed || this.requestQueue.isClosed()) {
- // if we found ourselves closed after the counter increment,
- // decrement the counter again and do not forward the request
- this.requestsNotReturned.decrementAndGet();
- throw new IOException("The reader has been closed.");
- }
- this.requestQueue.add(new SegmentReadRequest(this, segment));
- }
-
- public List<MemorySegment> getFullSegments()
- {
- synchronized (this.closeLock) {
- if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
- throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
- }
- }
-
- return this.returnBuffers;
- }
-
+ List<MemorySegment> getFullSegments();
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
deleted file mode 100644
index 7e64e79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.File;
-import java.util.Random;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
- * files that contain sorted runs of data from the same stream, that will later on be merged together.
- *
- */
-public final class Channel
-{
- private static final int RANDOM_BYTES_LENGTH = 16;
-
- /**
- * An ID identifying an underlying fileChannel.
- *
- */
- public static class ID
- {
- private final String path;
-
- private final int threadNum;
-
- protected ID(final String path, final int threadNum) {
- this.path = path;
- this.threadNum = threadNum;
- }
-
- protected ID(final String basePath, final int threadNum, final Random random)
- {
- this.path = basePath + File.separator + randomString(random) + ".channel";
- this.threadNum = threadNum;
- }
-
- /**
- * Returns the path to the underlying temporary file.
- */
- public String getPath() {
- return path;
- }
-
- int getThreadNum() {
- return this.threadNum;
- }
-
- public String toString() {
- return path;
- }
- }
-
- public static final class Enumerator
- {
- private static final String FORMAT = "%s%s%s.%06d.channel";
-
- private final String[] paths;
-
- private final String namePrefix;
-
- private int counter;
-
- protected Enumerator(final String[] basePaths, final Random random)
- {
- this.paths = basePaths;
- this.namePrefix = randomString(random);
- this.counter = 0;
- }
-
- public ID next()
- {
- final int threadNum = counter % paths.length;
- return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
- }
- }
-
- /**
- * Creates a random byte sequence using the provided {@code random} generator and returns its hex representation.
- *
- * @param random
- * The random number generator to be used.
- * @return A hex representation of the generated byte sequence
- */
- private static final String randomString(final Random random) {
- final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
- random.nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
deleted file mode 100644
index 2b5b34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-
-/**
- * A base class for readers and writers that read data from I/O manager channels, or write data to them.
- * Requests handled by channels that inherit from this class are executed asynchronously, which allows
- * write-behind for writers and pre-fetching for readers.
- *
- *
- * @param <T> The buffer type used for the underlying IO operations.
- */
-public abstract class ChannelAccess<T, R extends IORequest>
-{
- /**
- * The ID of the underlying channel.
- */
- protected final Channel.ID id;
-
- /**
- * A file channel for NIO access to the file.
- */
- protected final FileChannel fileChannel;
-
- /**
- * A request queue for submitting asynchronous requests to the corresponding
- * IO worker thread.
- */
- protected final RequestQueue<R> requestQueue;
-
- /**
- * An exception that was encountered by the asynchronous request handling thread.
- */
- protected volatile IOException exception;
-
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
- * the given request queue to be processed.
- *
- * @param channelID The id describing the path of the file that the channel accessed.
- * @param requestQueue The queue that this channel hands its IO requests to.
- * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
- * than in read-only mode.
- * @throws IOException Thrown, if the channel could no be opened.
- */
- protected ChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue, boolean writeEnabled)
- throws IOException
- {
- if (channelID == null || requestQueue == null) {
- throw new NullPointerException();
- }
-
- this.id = channelID;
- this.requestQueue = requestQueue;
-
- try {
- @SuppressWarnings("resource")
- RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
- this.fileChannel = file.getChannel();
- }
- catch (IOException e) {
- throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Checks, whether this channel has been closed;
- *
- * @return True, if the channel has been closed, false otherwise.
- */
- public abstract boolean isClosed();
-
- /**
- * This method is invoked by the asynchronous I/O thread to return a buffer after the I/O request
- * completed.
- *
- * @param buffer The buffer to be returned.
- */
- protected abstract void returnBuffer(T buffer);
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the channel ID of this channel.
- *
- * @return This channel's ID.
- */
- public final Channel.ID getChannelID()
- {
- return this.id;
- }
-
- /**
- * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
- * be processed correctly.
- *
- * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
- * that defined the erroneous state as its cause.
- */
- public final void checkErroneous() throws IOException
- {
- if (this.exception != null) {
- throw new IOException("The channel is erroneous.", this.exception);
- }
- }
-
- /**
- * Deletes this channel by physically removing the file beneath it.
- * This method may only be called on a closed channel.
- */
- public void deleteChannel()
- {
- if (this.fileChannel.isOpen()) {
- throw new IllegalStateException("Cannot delete a channel that is open.");
- }
-
- // make a best effort to delete the file. Don't report exceptions.
- try {
- File f = new File(this.id.getPath());
- if (f.exists()) {
- f.delete();
- }
- } catch (Throwable t) {}
- }
-
- /**
- * Handles a processed <tt>Buffer</tt>. This method is invoked by the
- * asynchronous IO worker threads upon completion of the IO request with the
- * provided buffer and/or an exception that occurred while processing the request
- * for that buffer.
- *
- * @param buffer The buffer to be processed.
- * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
- */
- final void handleProcessedBuffer(T buffer, IOException ex) {
-
- if (ex != null && this.exception == null) {
- this.exception = ex;
- }
-
- returnBuffer(buffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 25aa289..d85ec82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -164,8 +164,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
* @return A list containing all memory segments originally supplied to this view.
* @throws IOException Thrown, if the underlying reader could not be properly closed.
*/
- public List<MemorySegment> close() throws IOException
- {
+ public List<MemorySegment> close() throws IOException {
if (this.closed) {
throw new IllegalStateException("Already closed.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index f230333..9824d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -81,8 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
* @param memory The memory used to buffer data, or null, to utilize solely the return queue.
* @param segmentSize The size of the memory segments.
*/
- public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize)
- {
+ public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
super(segmentSize, HEADER_LENGTH);
if (writer == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
new file mode 100644
index 0000000..7c9d31b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
+ * files that contain sorted runs of data from the same stream, that will later on be merged together.
+ */
+public interface FileIOChannel {
+
+ /**
+ * Gets the channel ID of this I/O channel.
+ *
+ * @return The channel ID.
+ */
+ FileIOChannel.ID getChannelID();
+
+ /**
+ * Checks whether the channel has been closed.
+ *
+ * @return True if the channel has been closed, false otherwise.
+ */
+ boolean isClosed();
+
+ /**
+ * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+ * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
+ void close() throws IOException;
+
+ /**
+ * Deletes the file underlying this I/O channel.
+ *
+ * @throws IllegalStateException Thrown, when the channel is still open.
+ */
+ void deleteChannel();
+
+ /**
+ * Closes the channel and deletes the underlying file.
+ * For asynchronous implementations, this method waits until all pending requests are handled;
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
+ public void closeAndDelete() throws IOException;
+
+ // --------------------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * An ID identifying an underlying file channel.
+ */
+ public static class ID {
+
+ private static final int RANDOM_BYTES_LENGTH = 16;
+
+ private final String path;
+
+ private final int threadNum;
+
+ protected ID(String path, int threadNum) {
+ this.path = path;
+ this.threadNum = threadNum;
+ }
+
+ protected ID(String basePath, int threadNum, Random random) {
+ this.path = basePath + File.separator + randomString(random) + ".channel";
+ this.threadNum = threadNum;
+ }
+
+ /**
+ * Returns the path to the underlying temporary file.
+ */
+ public String getPath() {
+ return path;
+ }
+
+ int getThreadNum() {
+ return this.threadNum;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ID) {
+ ID other = (ID) obj;
+ return this.path.equals(other.path) && this.threadNum == other.threadNum;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ private static final String randomString(final Random random) {
+ final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
+ }
+
+ /**
+ * An enumerator for channels that logically belong together.
+ */
+ public static final class Enumerator {
+
+ private static final String FORMAT = "%s%s%s.%06d.channel";
+
+ private final String[] paths;
+
+ private final String namePrefix;
+
+ private int counter;
+
+ protected Enumerator(String[] basePaths, Random random) {
+ this.paths = basePaths;
+ this.namePrefix = ID.randomString(random);
+ this.counter = 0;
+ }
+
+ public ID next() {
+ final int threadNum = counter % paths.length;
+ return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
+ }
+ }
+}