You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/05/04 02:40:57 UTC
systemml git commit: [SYSTEMML-2297] Fix spark left indexing w/
partitioned broadcasts
Repository: systemml
Updated Branches:
refs/heads/master e7d948f9c -> 1791c1a26
[SYSTEMML-2297] Fix spark left indexing w/ partitioned broadcasts
This patch fixes the distributed spark left indexing operations which
failed due to index-out-of--bound for partitioned broadcasts and
unaligned blocks crossing multiple partitions, while other partitions
are not touched at all. We resolve this by simply pulling the slice over
1k x 1k blocks from the broadcast partition into the partitioned
broadcast because it anyway just works with the getBlock abstraction.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1791c1a2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1791c1a2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1791c1a2
Branch: refs/heads/master
Commit: 1791c1a26279445726416c0cf8ce55257da19840
Parents: e7d948f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu May 3 17:43:07 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu May 3 17:43:07 2018 -0700
----------------------------------------------------------------------
.../spark/data/PartitionedBlock.java | 56 -----------------
.../spark/data/PartitionedBroadcast.java | 66 ++++++++++++++++----
2 files changed, 55 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/1791c1a2/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 870d2a2..8a4999b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -27,18 +27,13 @@ import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
import java.util.Arrays;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
-import org.apache.sysml.runtime.util.IndexRange;
/**
* This class is for partitioned matrix/frame blocks, to be used as broadcasts.
@@ -195,57 +190,6 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
return ret;
}
-
- /**
- * Utility for slice operations over partitioned matrices, where the index range can cover
- * multiple blocks. The result is always a single result matrix block. All semantics are
- * equivalent to the core matrix block slice operations.
- *
- * @param rl row lower bound
- * @param ru row upper bound
- * @param cl column lower bound
- * @param cu column upper bound
- * @param block block object
- * @return block object
- */
- @SuppressWarnings("unchecked")
- public T slice(long rl, long ru, long cl, long cu, T block) {
- int lrl = (int) rl;
- int lru = (int) ru;
- int lcl = (int) cl;
- int lcu = (int) cu;
-
- ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
- int start_iix = (lrl-1)/_brlen+1;
- int end_iix = (lru-1)/_brlen+1;
- int start_jix = (lcl-1)/_bclen+1;
- int end_jix = (lcu-1)/_bclen+1;
-
- for( int iix = start_iix; iix <= end_iix; iix++ )
- for(int jix = start_jix; jix <= end_jix; jix++) {
- IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
- allBlks.addAll(OperationsOnMatrixValues.performSlice(
- ixrange, _brlen, _bclen, iix, jix, getBlock(iix, jix)));
- }
-
- if(allBlks.size() == 1) {
- return (T) allBlks.get(0).getValue();
- }
- else {
- //allocate output matrix
- Constructor<?> constr;
- try {
- constr = block.getClass().getConstructor(int.class, int.class, boolean.class);
- T ret = (T) constr.newInstance(lru-lrl+1, lcu-lcl+1, false);
- for(Pair<?, ?> kv : allBlks) {
- ret.merge((T)kv.getValue(), false);
- }
- return ret;
- } catch (Exception e) {
- throw new DMLRuntimeException(e);
- }
- }
- }
public void clearBlocks() {
_partBlocks = null;
http://git-wip-us.apache.org/repos/asf/systemml/blob/1791c1a2/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
index 94fcee4..a4c5173 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -20,11 +20,18 @@
package org.apache.sysml.runtime.instructions.spark.data;
import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.util.IndexRange;
/**
* This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks,
@@ -90,20 +97,57 @@ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable
return _pbc[pix].value().getBlock(rowIndex, colIndex);
}
+ /**
+ * Utility for slice operations over partitioned matrices, where the index range can cover
+ * multiple blocks. The result is always a single result matrix block. All semantics are
+ * equivalent to the core matrix block slice operations.
+ *
+ * @param rl row lower bound
+ * @param ru row upper bound
+ * @param cl column lower bound
+ * @param cu column upper bound
+ * @param block block object
+ * @return block object
+ */
+ @SuppressWarnings("unchecked")
public T slice(long rl, long ru, long cl, long cu, T block) {
- T ret = null;
- for( Broadcast<PartitionedBlock<T>> bc : _pbc ) {
- PartitionedBlock<T> pm = bc.value();
- T tmp = pm.slice(rl, ru, cl, cu, block);
- if( ret != null )
- ret.merge(tmp, false);
- else
- ret = tmp;
- }
+ int lrl = (int) rl;
+ int lru = (int) ru;
+ int lcl = (int) cl;
+ int lcu = (int) cu;
+
+ ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
+ int start_iix = (lrl-1)/_mc.getRowsPerBlock()+1;
+ int end_iix = (lru-1)/_mc.getRowsPerBlock()+1;
+ int start_jix = (lcl-1)/_mc.getColsPerBlock()+1;
+ int end_jix = (lcu-1)/_mc.getColsPerBlock()+1;
+
+ for( int iix = start_iix; iix <= end_iix; iix++ )
+ for(int jix = start_jix; jix <= end_jix; jix++) {
+ IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
+ allBlks.addAll(OperationsOnMatrixValues.performSlice(
+ ixrange, _mc.getRowsPerBlock(), _mc.getColsPerBlock(), iix, jix, getBlock(iix, jix)));
+ }
- return ret;
+ if(allBlks.size() == 1) {
+ return (T) allBlks.get(0).getValue();
+ }
+ else {
+ //allocate output matrix
+ Constructor<?> constr;
+ try {
+ constr = block.getClass().getConstructor(int.class, int.class, boolean.class);
+ T ret = (T) constr.newInstance(lru-lrl+1, lcu-lcl+1, false);
+ for(Pair<?, ?> kv : allBlks) {
+ ret.merge((T)kv.getValue(), false);
+ }
+ return ret;
+ } catch (Exception e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
}
-
+
/**
* This method cleanups all underlying broadcasts of a partitioned broadcast,
* by forward the calls to SparkExecutionContext.cleanupBroadcastVariable.