You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/05/04 02:40:57 UTC

systemml git commit: [SYSTEMML-2297] Fix spark left indexing w/ partitioned broadcasts

Repository: systemml
Updated Branches:
  refs/heads/master e7d948f9c -> 1791c1a26


[SYSTEMML-2297] Fix spark left indexing w/ partitioned broadcasts

This patch fixes the distributed spark left indexing operations which
failed due to index-out-of--bound for partitioned broadcasts and
unaligned blocks crossing multiple partitions, while other partitions
are not touched at all. We resolve this by simply pulling the slice over
1k x 1k blocks from the broadcast partition into the partitioned
broadcast because it anyway just works with the getBlock abstraction.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1791c1a2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1791c1a2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1791c1a2

Branch: refs/heads/master
Commit: 1791c1a26279445726416c0cf8ce55257da19840
Parents: e7d948f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu May 3 17:43:07 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu May 3 17:43:07 2018 -0700

----------------------------------------------------------------------
 .../spark/data/PartitionedBlock.java            | 56 -----------------
 .../spark/data/PartitionedBroadcast.java        | 66 ++++++++++++++++----
 2 files changed, 55 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/1791c1a2/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 870d2a2..8a4999b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -27,18 +27,13 @@ import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
 import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
-import org.apache.sysml.runtime.util.IndexRange;
 
 /**
  * This class is for partitioned matrix/frame blocks, to be used as broadcasts. 
@@ -195,57 +190,6 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		
 		return ret;
 	}
-
-	/**
-	 * Utility for slice operations over partitioned matrices, where the index range can cover
-	 * multiple blocks. The result is always a single result matrix block. All semantics are 
-	 * equivalent to the core matrix block slice operations. 
-	 * 
-	 * @param rl row lower bound
-	 * @param ru row upper bound
-	 * @param cl column lower bound
-	 * @param cu column upper bound
-	 * @param block block object
-	 * @return block object
-	 */
-	@SuppressWarnings("unchecked")
-	public T slice(long rl, long ru, long cl, long cu, T block) {
-		int lrl = (int) rl;
-		int lru = (int) ru;
-		int lcl = (int) cl;
-		int lcu = (int) cu;
-		
-		ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
-		int start_iix = (lrl-1)/_brlen+1;
-		int end_iix = (lru-1)/_brlen+1;
-		int start_jix = (lcl-1)/_bclen+1;
-		int end_jix = (lcu-1)/_bclen+1;
-				
-		for( int iix = start_iix; iix <= end_iix; iix++ )
-			for(int jix = start_jix; jix <= end_jix; jix++) {
-				IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
-				allBlks.addAll(OperationsOnMatrixValues.performSlice(
-						ixrange, _brlen, _bclen, iix, jix, getBlock(iix, jix)));
-			}
-		
-		if(allBlks.size() == 1) {
-			return (T) allBlks.get(0).getValue();
-		}
-		else {
-			//allocate output matrix
-			Constructor<?> constr;
-			try {
-				constr = block.getClass().getConstructor(int.class, int.class, boolean.class);
-				T ret = (T) constr.newInstance(lru-lrl+1, lcu-lcl+1, false);
-				for(Pair<?, ?> kv : allBlks) {
-					ret.merge((T)kv.getValue(), false);
-				}
-				return ret;
-			} catch (Exception e) {
-				throw new DMLRuntimeException(e);
-			}
-		}
-	}
 	
 	public void clearBlocks() {
 		_partBlocks = null;

http://git-wip-us.apache.org/repos/asf/systemml/blob/1791c1a2/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
index 94fcee4..a4c5173 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -20,11 +20,18 @@
 package org.apache.sysml.runtime.instructions.spark.data;
 
 import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.util.IndexRange;
 
 /**
  * This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks,
@@ -90,20 +97,57 @@ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable
 		return _pbc[pix].value().getBlock(rowIndex, colIndex);
 	}
 	
+	/**
+	 * Utility for slice operations over partitioned matrices, where the index range can cover
+	 * multiple blocks. The result is always a single result matrix block. All semantics are 
+	 * equivalent to the core matrix block slice operations. 
+	 * 
+	 * @param rl row lower bound
+	 * @param ru row upper bound
+	 * @param cl column lower bound
+	 * @param cu column upper bound
+	 * @param block block object
+	 * @return block object
+	 */
+	@SuppressWarnings("unchecked")
 	public T slice(long rl, long ru, long cl, long cu, T block) {
-		T ret = null;
-		for( Broadcast<PartitionedBlock<T>> bc : _pbc ) {
-			PartitionedBlock<T> pm = bc.value();
-			T tmp = pm.slice(rl, ru, cl, cu, block);
-			if( ret != null )
-				ret.merge(tmp, false);
-			else
-				ret = tmp;
-		}
+		int lrl = (int) rl;
+		int lru = (int) ru;
+		int lcl = (int) cl;
+		int lcu = (int) cu;
+		
+		ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
+		int start_iix = (lrl-1)/_mc.getRowsPerBlock()+1;
+		int end_iix = (lru-1)/_mc.getRowsPerBlock()+1;
+		int start_jix = (lcl-1)/_mc.getColsPerBlock()+1;
+		int end_jix = (lcu-1)/_mc.getColsPerBlock()+1;
+				
+		for( int iix = start_iix; iix <= end_iix; iix++ )
+			for(int jix = start_jix; jix <= end_jix; jix++) {
+				IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
+				allBlks.addAll(OperationsOnMatrixValues.performSlice(
+					ixrange, _mc.getRowsPerBlock(), _mc.getColsPerBlock(), iix, jix, getBlock(iix, jix)));
+			}
 		
-		return ret;
+		if(allBlks.size() == 1) {
+			return (T) allBlks.get(0).getValue();
+		}
+		else {
+			//allocate output matrix
+			Constructor<?> constr;
+			try {
+				constr = block.getClass().getConstructor(int.class, int.class, boolean.class);
+				T ret = (T) constr.newInstance(lru-lrl+1, lcu-lcl+1, false);
+				for(Pair<?, ?> kv : allBlks) {
+					ret.merge((T)kv.getValue(), false);
+				}
+				return ret;
+			} catch (Exception e) {
+				throw new DMLRuntimeException(e);
+			}
+		}
 	}
-
+	
 	/**
 	 * This method cleanups all underlying broadcasts of a partitioned broadcast,
 	 * by forward the calls to SparkExecutionContext.cleanupBroadcastVariable.