You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:05:56 UTC

[flink] 01/06: [hotfix][runtime] Cleanup IOManager code

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jun 19 12:36:54 2019 +0800

    [hotfix][runtime] Cleanup IOManager code
---
 .../runtime/io/disk/iomanager/FileIOChannel.java   | 78 ++++++++++----------
 .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++++++++++-----------
 .../IOManagerAsyncWithNoOpBufferFileWriter.java    | 53 --------------
 .../operators/sort/LargeRecordHandlerITCase.java   | 26 ++++---
 4 files changed, 91 insertions(+), 149 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index fd8e8e6..ef57e03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -18,93 +18,90 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.util.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.util.StringUtils;
-
 /**
  * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
  * files that contain sorted runs of data from the same stream, that will later on be merged together.
  */
 public interface FileIOChannel {
-	
+
 	/**
 	 * Gets the channel ID of this I/O channel.
-	 * 
+	 *
 	 * @return The channel ID.
 	 */
-	FileIOChannel.ID getChannelID();
-	
+	ID getChannelID();
+
 	/**
 	 * Gets the size (in bytes) of the file underlying the channel.
-	 * 
-	 * @return The size (in bytes) of the file underlying the channel.
 	 */
 	long getSize() throws IOException;
-	
+
 	/**
 	 * Checks whether the channel has been closed.
-	 * 
+	 *
 	 * @return True if the channel has been closed, false otherwise.
 	 */
 	boolean isClosed();
 
 	/**
-	* Closes the channel. For asynchronous implementations, this method waits until all pending requests are
-	* handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
-	* 
-	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
-	*/
+	 * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	 *
+	 * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	 */
 	void close() throws IOException;
 
 	/**
 	 * Deletes the file underlying this I/O channel.
-	 *  
+	 *
 	 * @throws IllegalStateException Thrown, when the channel is still open.
 	 */
 	void deleteChannel();
-	
-	/**
-	* Closes the channel and deletes the underlying file.
-	* For asynchronous implementations, this method waits until all pending requests are handled;
-	* 
-	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
-	*/
-	public void closeAndDelete() throws IOException;
 
 	FileChannel getNioFileChannel();
-	
+
+	/**
+	 * Closes the channel and deletes the underlying file. For asynchronous implementations,
+	 * this method waits until all pending requests are handled.
+	 *
+	 * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	 */
+	void closeAndDelete() throws IOException;
+
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * An ID identifying an underlying file channel.
 	 */
-	public static class ID {
-		
+	class ID {
+
 		private static final int RANDOM_BYTES_LENGTH = 16;
-		
+
 		private final File path;
-		
+
 		private final int threadNum;
 
-		protected ID(File path, int threadNum) {
+		private ID(File path, int threadNum) {
 			this.path = path;
 			this.threadNum = threadNum;
 		}
 
-		protected ID(File basePath, int threadNum, Random random) {
+		public ID(File basePath, int threadNum, Random random) {
 			this.path = new File(basePath, randomString(random) + ".channel");
 			this.threadNum = threadNum;
 		}
 
 		/**
 		 * Returns the path to the underlying temporary file.
-		 * @return The path to the underlying temporary file..
 		 */
 		public String getPath() {
 			return path.getAbsolutePath();
@@ -112,12 +109,11 @@ public interface FileIOChannel {
 
 		/**
 		 * Returns the path to the underlying temporary file as a File.
-		 * @return The path to the underlying temporary file as a File.
 		 */
 		public File getPathFile() {
 			return path;
 		}
-		
+
 		int getThreadNum() {
 			return this.threadNum;
 		}
@@ -131,17 +127,17 @@ public interface FileIOChannel {
 				return false;
 			}
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return path.hashCode();
 		}
-		
+
 		@Override
 		public String toString() {
 			return path.getAbsolutePath();
 		}
-		
+
 		private static String randomString(Random random) {
 			byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
 			random.nextBytes(bytes);
@@ -152,7 +148,7 @@ public interface FileIOChannel {
 	/**
 	 * An enumerator for channels that logically belong together.
 	 */
-	public static final class Enumerator {
+	final class Enumerator {
 
 		private static AtomicInteger globalCounter = new AtomicInteger();
 
@@ -162,7 +158,7 @@ public interface FileIOChannel {
 
 		private int localCounter;
 
-		protected Enumerator(File[] basePaths, Random random) {
+		public Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
 			this.localCounter = 0;
@@ -177,4 +173,4 @@ public interface FileIOChannel {
 			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0aaadf0..6723597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
@@ -36,10 +38,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * The facade for the provided I/O manager services.
  */
 public abstract class IOManager {
-	/** Logging */
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/** The temporary directories for files */
+	/** The temporary directories for files. */
 	private final File[] paths;
 
 	/** A random number generator for the anonymous ChannelIDs. */
@@ -120,41 +121,40 @@ public abstract class IOManager {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
-	 * invocations of this method spread the channels evenly across the different directories.
+	 * Creates a new {@link ID} in one of the temp directories. Multiple invocations of this
+	 * method spread the channels evenly across the different directories.
 	 *
 	 * @return A channel to a temporary directory.
 	 */
-	public FileIOChannel.ID createChannel() {
+	public ID createChannel() {
 		final int num = getNextPathNum();
-		return new FileIOChannel.ID(this.paths[num], num, this.random);
+		return new ID(this.paths[num], num, this.random);
 	}
 
 	/**
-	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
+	 * Creates a new {@link Enumerator}, spreading the channels in a round-robin fashion
 	 * across the temporary file directories.
 	 *
 	 * @return An enumerator for channels.
 	 */
-	public FileIOChannel.Enumerator createChannelEnumerator() {
-		return new FileIOChannel.Enumerator(this.paths, this.random);
+	public Enumerator createChannelEnumerator() {
+		return new Enumerator(this.paths, this.random);
 	}
 
 	/**
 	 * Deletes the file underlying the given channel. If the channel is still open, this
 	 * call may fail.
-	 * 
+	 *
 	 * @param channel The channel to be deleted.
-	 * @throws IOException Thrown if the deletion fails.
 	 */
-	public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+	public static void deleteChannel(ID channel) {
 		if (channel != null) {
 			if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
 				LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
@@ -167,8 +167,8 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
-		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
+	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID) throws IOException {
+		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<>());
 	}
 
 	/**
@@ -180,8 +180,9 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
-				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+	public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(
+		ID channelID,
+		LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer calls the given callback
@@ -193,7 +194,9 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+	public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(
+		ID channelID,
+		RequestDoneCallback<MemorySegment> callback) throws IOException;
 
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
@@ -204,8 +207,8 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
-		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
+	public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID) throws IOException {
+		return createBlockChannelReader(channelID, new LinkedBlockingQueue<>());
 	}
 
 	/**
@@ -217,22 +220,27 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
-										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+	public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(
+		ID channelID,
+		LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
-	public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+	public abstract BufferFileWriter createBufferFileWriter(ID channelID) throws IOException;
 
-	public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
+	public abstract BufferFileReader createBufferFileReader(
+		ID channelID,
+		RequestDoneCallback<Buffer> callback) throws IOException;
 
-	public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException;
+	public abstract BufferFileSegmentReader createBufferFileSegmentReader(
+		ID channelID,
+		RequestDoneCallback<FileSegment> callback) throws IOException;
 
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many
 	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be
 	 * obtained from the reader.
-	 * <p>
-	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a
+	 *
+	 * <p>If a channel is not to be read in one bulk, but in multiple smaller batches, a
 	 * {@link BlockChannelReader} should be used.
 	 *
 	 * @param channelID The descriptor for the channel to write to.
@@ -241,26 +249,19 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
-			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(
+		ID channelID,
+		List<MemorySegment> targetSegments,
+		int numBlocks) throws IOException;
 
 
 	// ------------------------------------------------------------------------
 	//                          Utilities
 	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of directories across which the I/O manager rotates its files.
-	 * 
-	 * @return The number of temporary file directories.
-	 */
-	public int getNumberOfSpillingDirectories() {
-		return this.paths.length;
-	}
 
 	/**
 	 * Gets the directories that the I/O manager spills to.
-	 * 
+	 *
 	 * @return The directories that the I/O manager spills to.
 	 */
 	public File[] getSpillingDirectories() {
@@ -279,8 +280,8 @@ public abstract class IOManager {
 		}
 		return strings;
 	}
-	
-	protected int getNextPathNum() {
+
+	private int getNextPathNum() {
 		final int next = this.nextPath;
 		final int newNext = next + 1;
 		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
deleted file mode 100644
index 363e02b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-
-import java.io.IOException;
-
-/**
- * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
- *
- * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
- */
-public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
-	@Override
-	public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
-			throws IOException {
-		return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID));
-	}
-
-	/**
-	 * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}.
-	 */
-	private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter {
-
-		private NoOpAsynchronousBufferFileWriter(
-				ID channelID,
-				RequestQueue<WriteRequest> requestQueue) throws IOException {
-			super(channelID, requestQueue);
-		}
-
-		@Override
-		public void writeBlock(Buffer buffer) throws IOException {
-			// do nothing
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 60d17bf..8f9e4dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +40,20 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class LargeRecordHandlerITCase extends TestLogger {
 
 	@Test
@@ -262,9 +262,7 @@ public class LargeRecordHandlerITCase extends TestLogger {
 		}
 		finally {
 			if (channel != null) {
-				try {
-					ioMan.deleteChannel(channel);
-				} catch (IOException ignored) {}
+				ioMan.deleteChannel(channel);
 			}
 
 			ioMan.shutdown();