You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2023/07/20 22:37:31 UTC

[systemds] branch main updated: [SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 791d7f1d1f [SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions
791d7f1d1f is described below

commit 791d7f1d1f20aed896d72c1f867bf315e4dd0f57
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Jul 20 17:32:54 2023 +0200

    [SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions
    
    This patch adds optional double buffering to writing large matrix or
    frame blocks. While the write of serialized objects already uses file
    channels, we now also use double buffering for serializing and writing
    data objects which overlaps serialization and I/O. Instead of changing
    existing dataoutputstream, we simply add a new output stream that
    is chained with our serialization and file I/O.
    
    When writing 1GB dense matrices sequentially on my laptop, the runtime
    [in ms] improved by 15-20% as follows:
    
    OLD:
    Wrote 1GB file in 1287.6416
    Wrote 1GB file in 1278.0401
    Wrote 1GB file in 1289.4362
    Wrote 1GB file in 1261.900799
    Wrote 1GB file in 1213.6319
    Wrote 1GB file in 1247.9706
    Wrote 1GB file in 1253.8381
    Wrote 1GB file in 1310.958899
    Wrote 1GB file in 1249.537899
    Wrote 1GB file in 1307.236
    
    NEW:
    Wrote 1GB file in 1127.8535
    Wrote 1GB file in 1020.9053
    Wrote 1GB file in 1063.8086
    Wrote 1GB file in 980.9357
    Wrote 1GB file in 1000.660601
    Wrote 1GB file in 1064.507401
    Wrote 1GB file in 1015.989001
    Wrote 1GB file in 1102.461899
    Wrote 1GB file in 1074.059
    Wrote 1GB file in 1136.3946
---
 .../runtime/util/DoubleBufferingOutputStream.java  | 108 +++++++++++++++++++++
 .../apache/sysds/runtime/util/LocalFileUtils.java  |  21 ++--
 2 files changed, 121 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
new file mode 100644
index 0000000000..0a2ff6bfa4
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package org.apache.sysds.runtime.util;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+
+public class DoubleBufferingOutputStream extends FilterOutputStream
+{
+	ExecutorService _pool = CommonThreadPool.get(1); //no outrun
+	Future<?>[] _locks;
+	protected byte[][] _buff;
+	private int _pos;
+	
+	public DoubleBufferingOutputStream(OutputStream out) {
+		this(out, 2, 8192);
+	}
+
+	public DoubleBufferingOutputStream(OutputStream out, int num, int size) {
+		super(out);
+		if(size <= 0)
+			throw new IllegalArgumentException("Buffer size <= 0.");
+		if( size%8 != 0 )
+			throw new IllegalArgumentException("Buffer size not a multiple of 8.");
+		_buff = new byte[num][size];
+		_locks = new Future<?>[num];
+		for(int i=0; i<num; i++)
+			_locks[i] = ConcurrentUtils.constantFuture(null);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		throw new IOException("Not supported"); 
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) 
+		throws IOException 
+	{
+		try {
+			synchronized(_buff) {
+				//block until buffer is free to use
+				_locks[_pos].get();
+				
+				//copy for asynchronous write because b is reused higher up
+				System.arraycopy(b, off, _buff[_pos], 0, len);
+				
+				//submit write request 
+				_locks[_pos] = _pool.submit(() -> writeBuffer(_buff[_pos], 0, len));
+				_pos = (_pos+1) % _buff.length;
+			}
+		}
+		catch(Exception ex) {
+			throw new IOException(ex);
+		}
+	}
+	
+	public void writeBuffer(byte[] b, int off, int len) {
+		try {
+			out.write(b, off, len);
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		try {
+			synchronized(_buff) {
+				for(int i=0; i<_buff.length; i++)
+					_locks[i].get();
+			}
+		}
+		catch(Exception ex) {
+			throw new IOException(ex);
+		}
+		out.flush();
+	}
+
+	@Override
+	public void close() throws IOException {
+		_pool.shutdown();
+		super.close();
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
index 88b1b66ffe..99a5131115 100644
--- a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Writer;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -54,6 +55,7 @@ import org.apache.sysds.runtime.matrix.data.Pair;
 public class LocalFileUtils 
 {
 	public static final int BUFFER_SIZE = 8192;
+	public static final int DOUBLE_BUFFERING_MIN = 100*1024;
 	
 	//unique IDs per JVM for tmp files
 	private static IDSequence _seq = null;
@@ -197,7 +199,7 @@ public class LocalFileUtils
 	 * @throws IOException if IOException occurs
 	 */
 	public static void writeMatrixBlockToLocal(String fname, MatrixBlock mb) throws IOException {
-		writeWritableToLocal(fname, mb);
+		writeWritableToLocal(fname, mb, mb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
 	}
 	
 	/** Writes a frame block to local file system.
@@ -207,7 +209,7 @@ public class LocalFileUtils
 	 * @throws IOException if IOException occurs
 	 */
 	public static void writeFrameBlockToLocal(String fname, FrameBlock fb) throws IOException {
-		writeWritableToLocal(fname, fb);
+		writeWritableToLocal(fname, fb, fb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
 	}
 
 	/** Writes a matrix/frame block to local file system.
@@ -217,7 +219,7 @@ public class LocalFileUtils
 	 * @throws IOException if IOException occurs
 	 */
 	public static void writeCacheBlockToLocal(String fname, CacheBlock<?> cb) throws IOException {
-		writeWritableToLocal(fname, cb);
+		writeWritableToLocal(fname, cb, cb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
 	}
 	
 	/**
@@ -226,21 +228,24 @@ public class LocalFileUtils
 	 * 
 	 * @param fname file name to write
 	 * @param mb Hadoop writable
+	 * @param doubleBuffering overlay serialization and I/O
 	 * @throws IOException if IOException occurs
 	 */
-	public static void writeWritableToLocal(String fname, Writable mb)
+	public static void writeWritableToLocal(String fname, Writable mb, boolean doubleBuffering)
 		throws IOException
-	{	
-		FileOutputStream fos = new FileOutputStream( fname );
+	{
+		OutputStream fos = new FileOutputStream( fname );
+		if( doubleBuffering )
+			fos = new DoubleBufferingOutputStream(fos, 2, BUFFER_SIZE);
 		FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE);
 		
 		try {
 			mb.write(out);
 		}
 		finally {
-			IOUtilFunctions.closeSilently(out);
+			IOUtilFunctions.closeSilently(out); //incl double buffering
 			IOUtilFunctions.closeSilently(fos);
-		}	
+		}
 	}
 
 	public static void writeByteArrayToLocal( String fname, byte[] data )