You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2023/07/20 22:37:31 UTC
[systemds] branch main updated: [SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 791d7f1d1f [SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions
791d7f1d1f is described below
commit 791d7f1d1f20aed896d72c1f867bf315e4dd0f57
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Jul 20 17:32:54 2023 +0200
[SYSTEMDS-3597] Double Buffering for Buffer Pool Evictions
This patch adds optional double buffering to writing large matrix or
frame blocks. While the write of serialized objects already uses file
channels, we now also use double buffering for serializing and writing
data objects which overlaps serialization and I/O. Instead of changing
existing dataoutputstream, we simply add a new output stream that
is chained with our serialization and file I/O.
When writing 1GB dense matrices sequentially on my laptop, the runtime
[in ms] improved by 15-20% as follows:
OLD:
Wrote 1GB file in 1287.6416
Wrote 1GB file in 1278.0401
Wrote 1GB file in 1289.4362
Wrote 1GB file in 1261.900799
Wrote 1GB file in 1213.6319
Wrote 1GB file in 1247.9706
Wrote 1GB file in 1253.8381
Wrote 1GB file in 1310.958899
Wrote 1GB file in 1249.537899
Wrote 1GB file in 1307.236
NEW:
Wrote 1GB file in 1127.8535
Wrote 1GB file in 1020.9053
Wrote 1GB file in 1063.8086
Wrote 1GB file in 980.9357
Wrote 1GB file in 1000.660601
Wrote 1GB file in 1064.507401
Wrote 1GB file in 1015.989001
Wrote 1GB file in 1102.461899
Wrote 1GB file in 1074.059
Wrote 1GB file in 1136.3946
---
.../runtime/util/DoubleBufferingOutputStream.java | 108 +++++++++++++++++++++
.../apache/sysds/runtime/util/LocalFileUtils.java | 21 ++--
2 files changed, 121 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
new file mode 100644
index 0000000000..0a2ff6bfa4
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.util;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+
+public class DoubleBufferingOutputStream extends FilterOutputStream
+{
+ ExecutorService _pool = CommonThreadPool.get(1); //no outrun
+ Future<?>[] _locks;
+ protected byte[][] _buff;
+ private int _pos;
+
+ public DoubleBufferingOutputStream(OutputStream out) {
+ this(out, 2, 8192);
+ }
+
+ public DoubleBufferingOutputStream(OutputStream out, int num, int size) {
+ super(out);
+ if(size <= 0)
+ throw new IllegalArgumentException("Buffer size <= 0.");
+ if( size%8 != 0 )
+ throw new IllegalArgumentException("Buffer size not a multiple of 8.");
+ _buff = new byte[num][size];
+ _locks = new Future<?>[num];
+ for(int i=0; i<num; i++)
+ _locks[i] = ConcurrentUtils.constantFuture(null);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ throws IOException
+ {
+ try {
+ synchronized(_buff) {
+ //block until buffer is free to use
+ _locks[_pos].get();
+
+ //copy for asynchronous write because b is reused higher up
+ System.arraycopy(b, off, _buff[_pos], 0, len);
+
+ //submit write request
+ _locks[_pos] = _pool.submit(() -> writeBuffer(_buff[_pos], 0, len));
+ _pos = (_pos+1) % _buff.length;
+ }
+ }
+ catch(Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ public void writeBuffer(byte[] b, int off, int len) {
+ try {
+ out.write(b, off, len);
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ synchronized(_buff) {
+ for(int i=0; i<_buff.length; i++)
+ _locks[i].get();
+ }
+ }
+ catch(Exception ex) {
+ throw new IOException(ex);
+ }
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ _pool.shutdown();
+ super.close();
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
index 88b1b66ffe..99a5131115 100644
--- a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -54,6 +55,7 @@ import org.apache.sysds.runtime.matrix.data.Pair;
public class LocalFileUtils
{
public static final int BUFFER_SIZE = 8192;
+ public static final int DOUBLE_BUFFERING_MIN = 100*1024;
//unique IDs per JVM for tmp files
private static IDSequence _seq = null;
@@ -197,7 +199,7 @@ public class LocalFileUtils
* @throws IOException if IOException occurs
*/
public static void writeMatrixBlockToLocal(String fname, MatrixBlock mb) throws IOException {
- writeWritableToLocal(fname, mb);
+ writeWritableToLocal(fname, mb, mb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
}
/** Writes a frame block to local file system.
@@ -207,7 +209,7 @@ public class LocalFileUtils
* @throws IOException if IOException occurs
*/
public static void writeFrameBlockToLocal(String fname, FrameBlock fb) throws IOException {
- writeWritableToLocal(fname, fb);
+ writeWritableToLocal(fname, fb, fb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
}
/** Writes a matrix/frame block to local file system.
@@ -217,7 +219,7 @@ public class LocalFileUtils
* @throws IOException if IOException occurs
*/
public static void writeCacheBlockToLocal(String fname, CacheBlock<?> cb) throws IOException {
- writeWritableToLocal(fname, cb);
+ writeWritableToLocal(fname, cb, cb.getInMemorySize()>=DOUBLE_BUFFERING_MIN);
}
/**
@@ -226,21 +228,24 @@ public class LocalFileUtils
*
* @param fname file name to write
* @param mb Hadoop writable
+ * @param doubleBuffering overlay serialization and I/O
* @throws IOException if IOException occurs
*/
- public static void writeWritableToLocal(String fname, Writable mb)
+ public static void writeWritableToLocal(String fname, Writable mb, boolean doubleBuffering)
throws IOException
- {
- FileOutputStream fos = new FileOutputStream( fname );
+ {
+ OutputStream fos = new FileOutputStream( fname );
+ if( doubleBuffering )
+ fos = new DoubleBufferingOutputStream(fos, 2, BUFFER_SIZE);
FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE);
try {
mb.write(out);
}
finally {
- IOUtilFunctions.closeSilently(out);
+ IOUtilFunctions.closeSilently(out); //incl double buffering
IOUtilFunctions.closeSilently(fos);
- }
+ }
}
public static void writeByteArrayToLocal( String fname, byte[] data )