You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:05:56 UTC
[flink] 01/06: [hotfix][runtime] Cleanup IOManager code
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jun 19 12:36:54 2019 +0800
[hotfix][runtime] Cleanup IOManager code
---
.../runtime/io/disk/iomanager/FileIOChannel.java | 78 ++++++++++----------
.../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++++++++++-----------
.../IOManagerAsyncWithNoOpBufferFileWriter.java | 53 --------------
.../operators/sort/LargeRecordHandlerITCase.java | 26 ++++---
4 files changed, 91 insertions(+), 149 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index fd8e8e6..ef57e03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -18,93 +18,90 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.util.StringUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flink.util.StringUtils;
-
/**
* A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
* files that contain sorted runs of data from the same stream, that will later on be merged together.
*/
public interface FileIOChannel {
-
+
/**
* Gets the channel ID of this I/O channel.
- *
+ *
* @return The channel ID.
*/
- FileIOChannel.ID getChannelID();
-
+ ID getChannelID();
+
/**
* Gets the size (in bytes) of the file underlying the channel.
- *
- * @return The size (in bytes) of the file underlying the channel.
*/
long getSize() throws IOException;
-
+
/**
* Checks whether the channel has been closed.
- *
+ *
* @return True if the channel has been closed, false otherwise.
*/
boolean isClosed();
/**
- * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
- * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
- *
- * @throws IOException Thrown, if an error occurred while waiting for pending requests.
- */
+ * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+ * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
void close() throws IOException;
/**
* Deletes the file underlying this I/O channel.
- *
+ *
* @throws IllegalStateException Thrown, when the channel is still open.
*/
void deleteChannel();
-
- /**
- * Closes the channel and deletes the underlying file.
- * For asynchronous implementations, this method waits until all pending requests are handled;
- *
- * @throws IOException Thrown, if an error occurred while waiting for pending requests.
- */
- public void closeAndDelete() throws IOException;
FileChannel getNioFileChannel();
-
+
+ /**
+ * Closes the channel and deletes the underlying file. For asynchronous implementations,
+ * this method waits until all pending requests are handled.
+ *
+ * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+ */
+ void closeAndDelete() throws IOException;
+
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
-
+
/**
* An ID identifying an underlying file channel.
*/
- public static class ID {
-
+ class ID {
+
private static final int RANDOM_BYTES_LENGTH = 16;
-
+
private final File path;
-
+
private final int threadNum;
- protected ID(File path, int threadNum) {
+ private ID(File path, int threadNum) {
this.path = path;
this.threadNum = threadNum;
}
- protected ID(File basePath, int threadNum, Random random) {
+ public ID(File basePath, int threadNum, Random random) {
this.path = new File(basePath, randomString(random) + ".channel");
this.threadNum = threadNum;
}
/**
* Returns the path to the underlying temporary file.
- * @return The path to the underlying temporary file..
*/
public String getPath() {
return path.getAbsolutePath();
@@ -112,12 +109,11 @@ public interface FileIOChannel {
/**
* Returns the path to the underlying temporary file as a File.
- * @return The path to the underlying temporary file as a File.
*/
public File getPathFile() {
return path;
}
-
+
int getThreadNum() {
return this.threadNum;
}
@@ -131,17 +127,17 @@ public interface FileIOChannel {
return false;
}
}
-
+
@Override
public int hashCode() {
return path.hashCode();
}
-
+
@Override
public String toString() {
return path.getAbsolutePath();
}
-
+
private static String randomString(Random random) {
byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
random.nextBytes(bytes);
@@ -152,7 +148,7 @@ public interface FileIOChannel {
/**
* An enumerator for channels that logically belong together.
*/
- public static final class Enumerator {
+ final class Enumerator {
private static AtomicInteger globalCounter = new AtomicInteger();
@@ -162,7 +158,7 @@ public interface FileIOChannel {
private int localCounter;
- protected Enumerator(File[] basePaths, Random random) {
+ public Enumerator(File[] basePaths, Random random) {
this.paths = basePaths;
this.namePrefix = ID.randomString(random);
this.localCounter = 0;
@@ -177,4 +173,4 @@ public interface FileIOChannel {
return new ID(new File(paths[threadNum], filename), threadNum);
}
}
-}
\ No newline at end of file
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0aaadf0..6723597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.FileUtils;
@@ -36,10 +38,9 @@ import java.util.concurrent.LinkedBlockingQueue;
* The facade for the provided I/O manager services.
*/
public abstract class IOManager {
- /** Logging */
protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
- /** The temporary directories for files */
+ /** The temporary directories for files. */
private final File[] paths;
/** A random number generator for the anonymous ChannelIDs. */
@@ -120,41 +121,40 @@ public abstract class IOManager {
// ------------------------------------------------------------------------
/**
- * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
- * invocations of this method spread the channels evenly across the different directories.
+ * Creates a new {@link ID} in one of the temp directories. Multiple invocations of this
+ * method spread the channels evenly across the different directories.
*
* @return A channel to a temporary directory.
*/
- public FileIOChannel.ID createChannel() {
+ public ID createChannel() {
final int num = getNextPathNum();
- return new FileIOChannel.ID(this.paths[num], num, this.random);
+ return new ID(this.paths[num], num, this.random);
}
/**
- * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
+ * Creates a new {@link Enumerator}, spreading the channels in a round-robin fashion
* across the temporary file directories.
*
* @return An enumerator for channels.
*/
- public FileIOChannel.Enumerator createChannelEnumerator() {
- return new FileIOChannel.Enumerator(this.paths, this.random);
+ public Enumerator createChannelEnumerator() {
+ return new Enumerator(this.paths, this.random);
}
/**
* Deletes the file underlying the given channel. If the channel is still open, this
* call may fail.
- *
+ *
* @param channel The channel to be deleted.
- * @throws IOException Thrown if the deletion fails.
*/
- public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+ public static void deleteChannel(ID channel) {
if (channel != null) {
if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
}
}
}
-
+
// ------------------------------------------------------------------------
// Reader / Writer instantiations
// ------------------------------------------------------------------------
@@ -167,8 +167,8 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
- return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
+ public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID) throws IOException {
+ return createBlockChannelWriter(channelID, new LinkedBlockingQueue<>());
}
/**
@@ -180,8 +180,9 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+ public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(
+ ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
/**
* Creates a block channel writer that writes to the given channel. The writer calls the given callback
@@ -193,7 +194,9 @@ public abstract class IOManager {
* @return A block channel writer that writes to the given channel.
* @throws IOException Thrown, if the channel for the writer could not be opened.
*/
- public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+ public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(
+ ID channelID,
+ RequestDoneCallback<MemorySegment> callback) throws IOException;
/**
* Creates a block channel reader that reads blocks from the given channel. The reader pushed
@@ -204,8 +207,8 @@ public abstract class IOManager {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
- return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
+ public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID) throws IOException {
+ return createBlockChannelReader(channelID, new LinkedBlockingQueue<>());
}
/**
@@ -217,22 +220,27 @@ public abstract class IOManager {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
- LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+ public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(
+ ID channelID,
+ LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
- public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+ public abstract BufferFileWriter createBufferFileWriter(ID channelID) throws IOException;
- public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
+ public abstract BufferFileReader createBufferFileReader(
+ ID channelID,
+ RequestDoneCallback<Buffer> callback) throws IOException;
- public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException;
+ public abstract BufferFileSegmentReader createBufferFileSegmentReader(
+ ID channelID,
+ RequestDoneCallback<FileSegment> callback) throws IOException;
/**
* Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
* The reader draws segments to read the blocks into from a supplied list, which must contain as many
* segments as the channel has blocks. After the reader is done, the list with the full segments can be
* obtained from the reader.
- * <p>
- * If a channel is not to be read in one bulk, but in multiple smaller batches, a
+ *
+ * <p>If a channel is not to be read in one bulk, but in multiple smaller batches, a
* {@link BlockChannelReader} should be used.
*
* @param channelID The descriptor for the channel to write to.
@@ -241,26 +249,19 @@ public abstract class IOManager {
* @return A block channel reader that reads from the given channel.
* @throws IOException Thrown, if the channel for the reader could not be opened.
*/
- public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
- List<MemorySegment> targetSegments, int numBlocks) throws IOException;
+ public abstract BulkBlockChannelReader createBulkBlockChannelReader(
+ ID channelID,
+ List<MemorySegment> targetSegments,
+ int numBlocks) throws IOException;
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
- /**
- * Gets the number of directories across which the I/O manager rotates its files.
- *
- * @return The number of temporary file directories.
- */
- public int getNumberOfSpillingDirectories() {
- return this.paths.length;
- }
/**
* Gets the directories that the I/O manager spills to.
- *
+ *
* @return The directories that the I/O manager spills to.
*/
public File[] getSpillingDirectories() {
@@ -279,8 +280,8 @@ public abstract class IOManager {
}
return strings;
}
-
- protected int getNextPathNum() {
+
+ private int getNextPathNum() {
final int next = this.nextPath;
final int newNext = next + 1;
this.nextPath = newNext >= this.paths.length ? 0 : newNext;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
deleted file mode 100644
index 363e02b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-
-import java.io.IOException;
-
-/**
- * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
- *
- * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
- */
-public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
- @Override
- public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
- throws IOException {
- return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID));
- }
-
- /**
- * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}.
- */
- private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter {
-
- private NoOpAsynchronousBufferFileWriter(
- ID channelID,
- RequestQueue<WriteRequest> requestQueue) throws IOException {
- super(channelID, requestQueue);
- }
-
- @Override
- public void writeBlock(Buffer buffer) throws IOException {
- // do nothing
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 60d17bf..8f9e4dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -18,16 +18,6 @@
package org.apache.flink.runtime.operators.sort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +40,20 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.Value;
import org.apache.flink.util.MutableObjectIterator;
-
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class LargeRecordHandlerITCase extends TestLogger {
@Test
@@ -262,9 +262,7 @@ public class LargeRecordHandlerITCase extends TestLogger {
}
finally {
if (channel != null) {
- try {
- ioMan.deleteChannel(channel);
- } catch (IOException ignored) {}
+ ioMan.deleteChannel(channel);
}
ioMan.shutdown();