You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/08/15 19:10:51 UTC

[systemds] branch master updated: [SYSTEMDS-2624] Rework acquireRead and cleanup of federated matrices

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new fa66b9b  [SYSTEMDS-2624] Rework acquireRead and cleanup of federated matrices
fa66b9b is described below

commit fa66b9b3a6c1e5488f5a26812932fc64a7b94b3a
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Aug 15 20:58:07 2020 +0200

    [SYSTEMDS-2624] Rework acquireRead and cleanup of federated matrices
    
    The integration of federated matrices and frames into our caching and
    buffer pool framework was suboptimal because (1) repeated pinning of a
    federated data into memory of the coordinator caused repeated
    consolidation of the federated data, and (2) missing cleanup of
    federated data intermediates. This rework fixes both issues and thus,
    reduces unnecessary memory pressure at the workers and unnecessary data
    transfer at the coordinator.
    
    For the cleanup of federated data, we added special guards to ensure
    only federated intermediates (and files thereof) are deleted but NOT the
    original federated input files.
---
 .../sysds/runtime/controlprogram/ProgramBlock.java |  3 +-
 .../controlprogram/caching/CacheableData.java      | 37 +++++++++---
 .../controlprogram/caching/FrameObject.java        | 67 ++++++++++------------
 .../controlprogram/caching/MatrixObject.java       | 63 ++++++++++----------
 .../controlprogram/caching/TensorObject.java       |  8 +++
 .../controlprogram/context/ExecutionContext.java   |  9 ++-
 .../federated/FederatedWorkerHandler.java          |  1 +
 .../instructions/fed/FEDInstructionUtils.java      |  7 ++-
 .../functions/federated/FederatedRCBindTest.java   |  3 +-
 9 files changed, 114 insertions(+), 84 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
index a86acae..b9ef965 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
@@ -242,8 +242,9 @@ public abstract class ProgramBlock implements ParseInfo
 			
 			// try to reuse instruction result from lineage cache
 			if( !LineageCache.reuse(tmp, ec) ) {
-				// process actual instruction
 				long et0 = !ReuseCacheType.isNone() ? System.nanoTime() : 0;
+				
+				// process actual instruction
 				tmp.processInstruction(ec);
 				
 				// cache result
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index c287aeb..c809a84 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -466,10 +466,16 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		//(probe data for cache_nowrite / jvm_reuse)
 		if( _data==null && isEmpty(true) ) {
 			try {
-				if( DMLScript.STATISTICS )
-					CacheStatistics.incrementHDFSHits();
-				
-				if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
+				if( isFederated() ) {
+					_data = readBlobFromFederated( _fedMapping );
+					
+					//mark for initial local write despite read operation
+					_requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ;
+				}
+				else if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
+					if( DMLScript.STATISTICS )
+						CacheStatistics.incrementHDFSHits();
+					
 					//check filename
 					if( _hdfsFileName == null )
 						throw new DMLRuntimeException("Cannot read matrix for empty filename.");
@@ -661,6 +667,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 					gObj.clearData(null, DMLScript.EAGER_CUDA_FREE);
 		}
 		
+		//clear federated matrix
+		if( _fedMapping != null )
+			_fedMapping.cleanup(_fedMapping.getID());
+		
 		// change object state EMPTY
 		setDirty(false);
 		setEmpty();
@@ -923,19 +933,30 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		return hashCode() + " " + debugNameEnding;
 	}
 
-	protected T readBlobFromHDFS(String fname) 
-		throws IOException 
-	{
+	//HDFS read
+	protected T readBlobFromHDFS(String fname) throws IOException {
 		MetaDataFormat iimd = (MetaDataFormat) _metaData;
 		DataCharacteristics dc = iimd.getDataCharacteristics();
 		return readBlobFromHDFS(fname, dc.getDims());
 	}
 
-	protected abstract T readBlobFromHDFS(String fname, long[] dims) throws IOException;
+	protected abstract T readBlobFromHDFS(String fname, long[] dims)
+		throws IOException;
 
+	//RDD read
 	protected abstract T readBlobFromRDD(RDDObject rdd, MutableBoolean status)
 		throws IOException;
 
+	// Federated read
+	protected T readBlobFromFederated(FederationMap fedMap) throws IOException {
+		MetaDataFormat iimd = (MetaDataFormat) _metaData;
+		DataCharacteristics dc = iimd.getDataCharacteristics();
+		return readBlobFromFederated(fedMap, dc.getDims());
+	}
+	
+	protected abstract T readBlobFromFederated(FederationMap fedMap, long[] dims)
+		throws IOException;
+
 	protected abstract void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
 		throws IOException;
 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index ef6e790..0418ce7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -31,6 +31,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
@@ -160,41 +161,6 @@ public class FrameObject extends CacheableData<FrameBlock>
 	}
 	
 	@Override
-	public FrameBlock acquireRead() {
-		// forward call for non-federated objects
-		if( !isFederated() )
-			return super.acquireRead();
-		
-		FrameBlock result = new FrameBlock(_schema);
-		// provide long support?
-		result.ensureAllocatedColumns((int) _metaData.getDataCharacteristics().getRows());
-		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
-		try {
-			for(Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
-				FederatedRange range = readResponse.getLeft();
-				FederatedResponse response = readResponse.getRight().get();
-				// add result
-				FrameBlock multRes = (FrameBlock) response.getData()[0];
-				for (int r = 0; r < multRes.getNumRows(); r++) {
-					for (int c = 0; c < multRes.getNumColumns(); c++) {
-						int destRow = range.getBeginDimsInt()[0] + r;
-						int destCol = range.getBeginDimsInt()[1] + c;
-						result.set(destRow, destCol, multRes.get(r, c));
-					}
-				}
-			}
-		}
-		catch(Exception e) {
-			throw new DMLRuntimeException("Federated Frame read failed.", e);
-		}
-		
-		//keep returned object for future use 
-		acquireModify(result);
-		
-		return result;
-	}
-	
-	@Override
 	protected FrameBlock readBlobFromCache(String fname) throws IOException {
 		return (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
 	}
@@ -269,6 +235,36 @@ public class FrameObject extends CacheableData<FrameBlock>
 		
 		return fb;
 	}
+	
+	@Override
+	protected FrameBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
+		throws IOException
+	{
+		FrameBlock ret = new FrameBlock(_schema);
+		// provide long support?
+		ret.ensureAllocatedColumns((int) dims[0]);
+		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = fedMap.requestFederatedData();
+		try {
+			for(Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
+				FederatedRange range = readResponse.getLeft();
+				FederatedResponse response = readResponse.getRight().get();
+				// add result
+				FrameBlock multRes = (FrameBlock) response.getData()[0];
+				for (int r = 0; r < multRes.getNumRows(); r++) {
+					for (int c = 0; c < multRes.getNumColumns(); c++) {
+						int destRow = range.getBeginDimsInt()[0] + r;
+						int destCol = range.getBeginDimsInt()[1] + c;
+						ret.set(destRow, destCol, multRes.get(r, c));
+					}
+				}
+			}
+		}
+		catch(Exception e) {
+			throw new DMLRuntimeException("Federated Frame read failed.", e);
+		}
+		
+		return ret;
+	}
 
 	@Override
 	protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
@@ -290,5 +286,4 @@ public class FrameObject extends CacheableData<FrameBlock>
 		//lazy evaluation of pending transformations.
 		SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, iimd.getFileFormat());
 	}
-
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 6216ba5..d96b700 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -39,6 +39,7 @@ import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -393,40 +394,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 
 		return sb.toString();
 	}
-	
-	@Override
-	public MatrixBlock acquireRead() {
-		// forward call for non-federated objects
-		if( !isFederated() )
-			return super.acquireRead();
-		
-		long[] dims = getDataCharacteristics().getDims();
-		// TODO sparse optimization
-		MatrixBlock result = new MatrixBlock((int) dims[0], (int) dims[1], false);
-		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
-		try {
-			for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
-				FederatedRange range = readResponse.getLeft();
-				FederatedResponse response = readResponse.getRight().get();
-				// add result
-				int[] beginDimsInt = range.getBeginDimsInt();
-				int[] endDimsInt = range.getEndDimsInt();
-				MatrixBlock multRes = (MatrixBlock) response.getData()[0];
-				result.copy(beginDimsInt[0], endDimsInt[0] - 1,
-					beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
-				result.setNonZeros(result.getNonZeros() + multRes.getNonZeros());
-			}
-		}
-		catch (Exception e) {
-			throw new DMLRuntimeException("Federated matrix read failed.", e);
-		}
-		
-		//keep returned object for future use 
-		acquireModify(result);
-		
-		return result;
-	}
-	
+
 	// *********************************************
 	// ***                                       ***
 	// ***      LOW-LEVEL PROTECTED METHODS      ***
@@ -540,6 +508,33 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 		return mb;
 	}
 	
+	@Override
+	protected MatrixBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
+		throws IOException
+	{
+		// TODO sparse optimization
+		MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], false);
+		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = fedMap.requestFederatedData();
+		try {
+			for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
+				FederatedRange range = readResponse.getLeft();
+				FederatedResponse response = readResponse.getRight().get();
+				// add result
+				int[] beginDimsInt = range.getBeginDimsInt();
+				int[] endDimsInt = range.getEndDimsInt();
+				MatrixBlock multRes = (MatrixBlock) response.getData()[0];
+				ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
+					beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
+				ret.setNonZeros(ret.getNonZeros() + multRes.getNonZeros());
+			}
+		}
+		catch (Exception e) {
+			throw new DMLRuntimeException("Federated matrix read failed.", e);
+		}
+		
+		return ret;
+	}
+	
 	/**
 	 * Writes in-memory matrix to HDFS in a specified format.
 	 */
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
index d222f43..32fb72b 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
@@ -28,6 +28,7 @@ import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
@@ -141,6 +142,13 @@ public class TensorObject extends CacheableData<TensorBlock> {
 		// TODO read from RDD
 		return SparkExecutionContext.toTensorBlock((JavaPairRDD<TensorIndexes, TensorBlock>) rdd.getRDD(), tc);
 	}
+	
+	@Override
+	protected TensorBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
+		throws IOException
+	{
+		throw new DMLRuntimeException("Unsupported federated tensors");
+	}
 
 	@Override
 	protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index fcb5db3..31a467f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -59,7 +59,9 @@ import org.apache.sysds.utils.Statistics;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class ExecutionContext {
@@ -71,6 +73,7 @@ public class ExecutionContext {
 	//symbol table
 	protected LocalVariableMap _variables;
 	protected boolean _autoCreateVars;
+	protected Set<String> _guardedFiles = new HashSet<>();
 	
 	//lineage map, cache, prepared dedup blocks
 	protected Lineage _lineage;
@@ -131,6 +134,10 @@ public class ExecutionContext {
 	public void setAutoCreateVars(boolean flag) {
 		_autoCreateVars = flag;
 	}
+	
+	public void addGuardedFilename(String fname) {
+		_guardedFiles.add(fname);
+	}
 
 	/**
 	 * Get the i-th GPUContext
@@ -751,7 +758,7 @@ public class ExecutionContext {
 			//compute ref count only if matrix cleanup actually necessary
 			if ( mo.isCleanupEnabled() && !getVariables().hasReferences(mo) )  {
 				mo.clearData(); //clean cached data
-				if( fileExists ) {
+				if( fileExists && !_guardedFiles.contains(mo.getFileName()) ) {
 					HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName());
 					HDFSTool.deleteFileIfExistOnHDFS(mo.getFileName()+".mtd");
 				}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index f14bbb0..47ca43c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -181,6 +181,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		
 		//TODO spawn async load of data, otherwise on first access
 		_ec.setVariable(String.valueOf(id), cd);
+		_ec.addGuardedFilename(filename);
 		
 		if (dataType == Types.DataType.FRAME) {
 			FrameObject frameObject = (FrameObject) cd;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 00f3b04..0a5a2a2 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -54,9 +54,10 @@ public class FEDInstructionUtils {
 		}
 		else if (inst instanceof BinaryCPInstruction) {
 			BinaryCPInstruction instruction = (BinaryCPInstruction) inst;
-			if( instruction.input1.isMatrix() && ec.getMatrixObject(instruction.input1).isFederated()
-				|| instruction.input2.isMatrix() && ec.getMatrixObject(instruction.input2).isFederated() ) {
-				return BinaryFEDInstruction.parseInstruction(inst.getInstructionString());
+			if( (instruction.input1.isMatrix() && ec.getMatrixObject(instruction.input1).isFederated())
+				|| (instruction.input2.isMatrix() && ec.getMatrixObject(instruction.input2).isFederated()) ) {
+				if(!instruction.getOpcode().equals("append")) //TODO support rbind/cbind
+					return BinaryFEDInstruction.parseInstruction(inst.getInstructionString());
 			}
 		}
 		else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
index 81c0b00..0f28a7b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
@@ -96,7 +96,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-nvargs", "in=" + TestUtils.federatedAddress(port, input("A")), "rows=" + rows,
+		programArgs = new String[] {"-nvargs",
+			"in=" + TestUtils.federatedAddress(port, input("A")), "rows=" + rows,
 			"cols=" + cols, "out_R=" + output("R"), "out_C=" + output("C")};
 
 		runTest(true, false, null, -1);