You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/02 22:16:12 UTC

[systemds] branch master updated (4497199 -> 3d1ba3c)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git.


    from 4497199  [MINOR] Fix bugs in lineage tracing
     new 60d16c4  [SYSTEMDS-3118] Extended parfor parser/runtime (frame result variables)
     new 3d1ba3c  [SYSTEMDS-3115,3120] Implements cleaning pipeline enumeration scripts

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 scripts/builtin/correctTypos.dml                   |  51 ++-------
 scripts/pipelines/scripts/enumerateLogical.dml     |  25 +++--
 .../apache/sysds/parser/ParForStatementBlock.java  |   3 +-
 .../runtime/controlprogram/ParForProgramBlock.java |  92 +++++++++--------
 .../controlprogram/caching/FrameObject.java        |   7 ++
 .../runtime/controlprogram/parfor/ResultMerge.java |  90 ++--------------
 .../parfor/ResultMergeFrameLocalMemory.java        | 114 +++++++++++++++++++++
 .../parfor/ResultMergeLocalAutomatic.java          |   4 +-
 .../parfor/ResultMergeLocalFile.java               |   2 +-
 .../parfor/ResultMergeLocalMemory.java             |   2 +-
 .../{ResultMerge.java => ResultMergeMatrix.java}   |  50 ++-------
 .../parfor/ResultMergeRemoteSpark.java             |   2 +-
 .../parfor/ResultMergeRemoteSparkWCompare.java     |   2 +-
 .../parfor/ParForDependencyAnalysisTest.java       |  10 +-
 ...est.java => ParForListFrameResultVarsTest.java} |  22 +++-
 .../loop/leftMult.dml => parfor/parfor54e.dml}     |  11 +-
 .../loop/leftMult.dml => parfor/parfor54f.dml}     |  11 +-
 ...for_listResults.dml => parfor_frameResults.dml} |  11 +-
 18 files changed, 256 insertions(+), 253 deletions(-)
 create mode 100644 src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java
 copy src/main/java/org/apache/sysds/runtime/controlprogram/parfor/{ResultMerge.java => ResultMergeMatrix.java} (67%)
 rename src/test/java/org/apache/sysds/test/functions/parfor/misc/{ParForListResultVarsTest.java => ParForListFrameResultVarsTest.java} (75%)
 copy src/test/scripts/component/{compress/workload/loop/leftMult.dml => parfor/parfor54e.dml} (86%)
 copy src/test/scripts/component/{compress/workload/loop/leftMult.dml => parfor/parfor54f.dml} (86%)
 copy src/test/scripts/functions/parfor/{parfor_listResults.dml => parfor_frameResults.dml} (84%)

[systemds] 01/02: [SYSTEMDS-3118] Extended parfor parser/runtime (frame result variables)

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 60d16c474b76ecb4d45d3cd6e36580672fc6f1da
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Sep 3 00:12:53 2021 +0200

    [SYSTEMDS-3118] Extended parfor parser/runtime (frame result variables)
    
    This patch extends parfor by support for frame results variables during
    dependency analysis and merge of worker result variables. So far, this
    captures only in-memory frame result merge.
---
 .../apache/sysds/parser/ParForStatementBlock.java  |   3 +-
 .../runtime/controlprogram/ParForProgramBlock.java |  92 +++++++++--------
 .../controlprogram/caching/FrameObject.java        |   7 ++
 .../runtime/controlprogram/parfor/ResultMerge.java |  90 ++--------------
 .../parfor/ResultMergeFrameLocalMemory.java        | 114 +++++++++++++++++++++
 .../parfor/ResultMergeLocalAutomatic.java          |   4 +-
 .../parfor/ResultMergeLocalFile.java               |   2 +-
 .../parfor/ResultMergeLocalMemory.java             |   2 +-
 .../{ResultMerge.java => ResultMergeMatrix.java}   |  50 ++-------
 .../parfor/ResultMergeRemoteSpark.java             |   2 +-
 .../parfor/ResultMergeRemoteSparkWCompare.java     |   2 +-
 .../parfor/ParForDependencyAnalysisTest.java       |  10 +-
 ...est.java => ParForListFrameResultVarsTest.java} |  22 +++-
 src/test/scripts/component/parfor/parfor54e.dml    |  26 +++++
 src/test/scripts/component/parfor/parfor54f.dml    |  26 +++++
 .../functions/parfor/parfor_frameResults.dml       |  32 ++++++
 16 files changed, 306 insertions(+), 178 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
index 74c55c5..607641c 100644
--- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
@@ -677,7 +677,7 @@ public class ParForStatementBlock extends ForStatementBlock
 						for(DataIdentifier write : datsUpdated) {
 							if( !c._var.equals( write.getName() ) ) continue;
 							
-							if( cdt != DataType.MATRIX && cdt != DataType.LIST ) {
+							if( cdt != DataType.MATRIX && cdt != DataType.FRAME && cdt != DataType.LIST ) {
 								//cannot infer type, need to exit (conservative approach)
 								throw new LanguageException("PARFOR loop dependency analysis: cannot check "
 									+ "for dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+".");
@@ -716,6 +716,7 @@ public class ParForStatementBlock extends ForStatementBlock
 								return;
 						}
 						else if( (cdt == DataType.MATRIX && dat2dt == DataType.MATRIX)
+							|| (cdt == DataType.FRAME && dat2dt == DataType.FRAME )
 							|| (cdt == DataType.LIST && dat2dt == DataType.LIST ) )
 						{
 							boolean invalid = false;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 25d49bb..42ab8bc 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -38,6 +38,7 @@ import org.apache.sysds.parser.StatementBlock;
 import org.apache.sysds.parser.VariableSet;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -51,6 +52,7 @@ import org.apache.sysds.runtime.controlprogram.parfor.RemoteDPParForSpark;
 import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForJobReturn;
 import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForSpark;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge;
+import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeFrameLocalMemory;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalAutomatic;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory;
@@ -1056,9 +1058,9 @@ public class ParForProgramBlock extends ForProgramBlock
 	 * @param out output matrix
 	 * @param in array of input matrix objects
 	 */
-	private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in, boolean parallel) {
+	private static void cleanWorkerResultVariables(ExecutionContext ec, CacheableData<?> out, CacheableData<?>[] in, boolean parallel) {
 		//check for empty inputs (no iterations executed)
-		Stream<MatrixObject> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
+		Stream<CacheableData<?>> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
 		//perform cleanup (parallel to mitigate file deletion bottlenecks)
 		(parallel ? results.parallel() : results)
 			.forEach(m -> ec.cleanupCacheableData(m));
@@ -1307,33 +1309,41 @@ public class ParForProgramBlock extends ForProgramBlock
 		return dp;
 	}
 
-	private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, boolean accum, ExecutionContext ec ) 
+	private ResultMerge<?> createResultMerge( PResultMerge prm,
+		CacheableData<?> out, CacheableData<?>[] in, String fname, boolean accum, ExecutionContext ec ) 
 	{
-		ResultMerge rm = null;
+		ResultMerge<?> rm = null;
 		
-		//create result merge implementation (determine degree of parallelism 
-		//only for spark to avoid unnecessary spark context creation)
-		switch( prm )
-		{
-			case LOCAL_MEM:
-				rm = new ResultMergeLocalMemory( out, in, fname, accum );
-				break;
-			case LOCAL_FILE:
-				rm = new ResultMergeLocalFile( out, in, fname, accum );
-				break;
-			case LOCAL_AUTOMATIC:
-				rm = new ResultMergeLocalAutomatic( out, in, fname, accum );
-				break;
-			case REMOTE_SPARK:
-				int numMap = Math.max(_numThreads,
-					SparkExecutionContext.getDefaultParallelism(true));
-				int numRed = numMap; //equal map/reduce
-				rm = new ResultMergeRemoteSpark( out, in,
-					fname, accum, ec, numMap, numRed );
-				break;
-				
-			default:
-				throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'.");
+		if( out instanceof FrameObject ) {
+			rm = new ResultMergeFrameLocalMemory((FrameObject)out, (FrameObject[])in, fname, accum);
+		}
+		else if(out instanceof MatrixObject) {
+			//create result merge implementation (determine degree of parallelism 
+			//only for spark to avoid unnecessary spark context creation)
+			switch( prm )
+			{
+				case LOCAL_MEM:
+					rm = new ResultMergeLocalMemory( (MatrixObject)out, (MatrixObject[])in, fname, accum );
+					break;
+				case LOCAL_FILE:
+					rm = new ResultMergeLocalFile( (MatrixObject)out, (MatrixObject[])in, fname, accum );
+					break;
+				case LOCAL_AUTOMATIC:
+					rm = new ResultMergeLocalAutomatic( (MatrixObject)out, (MatrixObject[])in, fname, accum );
+					break;
+				case REMOTE_SPARK:
+					int numMap = Math.max(_numThreads,
+						SparkExecutionContext.getDefaultParallelism(true));
+					int numRed = numMap; //equal map/reduce
+					rm = new ResultMergeRemoteSpark( (MatrixObject)out,
+						(MatrixObject[])in, fname, accum, ec, numMap, numRed );
+					break;
+				default:
+					throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'.");
+			}
+		}
+		else {
+			throw new DMLRuntimeException("Unsupported result merge data: "+out.getClass().getSimpleName());
 		}
 		
 		return rm;
@@ -1437,14 +1447,15 @@ public class ParForProgramBlock extends ForProgramBlock
 			{
 				Data dat = ec.getVariable(var._name);
 				
-				if( dat instanceof MatrixObject ) //robustness scalars
+				if( dat instanceof MatrixObject | dat instanceof FrameObject )
 				{
-					MatrixObject out = (MatrixObject) dat;
-					MatrixObject[] in = Arrays.stream(results).map(vars -> 
-						vars.get(var._name)).toArray(MatrixObject[]::new);
+					CacheableData<?> out = (CacheableData<?>) dat;
+					Stream<Object> tmp = Arrays.stream(results).map(vars -> vars.get(var._name));
+					CacheableData<?>[] in = (dat instanceof MatrixObject) ?
+						tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);
 					String fname = constructResultMergeFileName();
-					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
-					MatrixObject outNew = USE_PARALLEL_RESULT_MERGE ?
+					ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
+					CacheableData<?> outNew = USE_PARALLEL_RESULT_MERGE ?
 						rm.executeParallelMerge(_numThreads) :
 						rm.executeSerialMerge();
 					
@@ -1653,18 +1664,19 @@ public class ParForProgramBlock extends ForProgramBlock
 					if( var == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks)
 						break;
 				
-					MatrixObject out = null;
+					CacheableData<?> out = null;
 					synchronized( _ec.getVariables() ){
-						out = _ec.getMatrixObject(var._name);
+						out = _ec.getCacheableData(var._name);
 					}
 					
-					MatrixObject[] in = new MatrixObject[ _refVars.length ];
-					for( int i=0; i< _refVars.length; i++ )
-						in[i] = (MatrixObject) _refVars[i].get( var._name ); 
+					Stream<Object> tmp = Arrays.stream(_refVars).map(vars -> vars.get(var._name));
+					CacheableData<?>[] in = (out instanceof MatrixObject) ?
+						tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);
+					
 					String fname = constructResultMergeFileName();
 				
-					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
-					MatrixObject outNew = null;
+					ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
+					CacheableData<?> outNew = null;
 					if( USE_PARALLEL_RESULT_MERGE )
 						outNew = rm.executeParallelMerge( _numThreads );
 					else
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 5eae986..4485388 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -41,6 +41,7 @@ import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaData;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.runtime.util.UtilFunctions;
@@ -86,6 +87,12 @@ public class FrameObject extends CacheableData<FrameBlock>
 	 */
 	public FrameObject(FrameObject fo) {
 		super(fo);
+		
+		MetaDataFormat metaOld = (MetaDataFormat) fo.getMetaData();
+		_metaData = new MetaDataFormat(
+			new MatrixCharacteristics(metaOld.getDataCharacteristics()),
+			metaOld.getFileFormat());
+		_schema = fo._schema.clone();
 	}
 	
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java
index 18b09a1..b69ba96 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java
@@ -21,42 +21,33 @@ package org.apache.sysds.runtime.controlprogram.parfor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 
 import java.io.Serializable;
-import java.util.List;
 
-/**
- * Due to independence of all iterations, any result has the following properties:
- * (1) non local var, (2) matrix object, and (3) completely independent.
- * These properties allow us to realize result merging in parallel without any synchronization. 
- * 
- */
-public abstract class ResultMerge implements Serializable
+public abstract class ResultMerge<T extends CacheableData<?>> implements Serializable
 {
 	//note: this class needs to be serializable to ensure that all attributes of
 	//ResultMergeRemoteSparkWCompare are included in the task closure
-	private static final long serialVersionUID = 2620430969346516677L;
+	private static final long serialVersionUID = -6756689640511059030L;
 	
 	protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName());
 	protected static final String NAME_SUFFIX = "_rm";
 	protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+");
 	
 	//inputs to result merge
-	protected MatrixObject   _output      = null;
-	protected MatrixObject[] _inputs      = null; 
-	protected String         _outputFName = null;
-	protected boolean        _isAccum     = false;
+	protected T       _output      = null;
+	protected T[]     _inputs      = null; 
+	protected String  _outputFName = null;
+	protected boolean _isAccum     = false;
 	
 	protected ResultMerge( ) {
 		//do nothing
 	}
 	
-	public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) {
+	public ResultMerge( T out, T[] in, String outputFilename, boolean accum ) {
 		_output = out;
 		_inputs = in;
 		_outputFName = outputFilename;
@@ -70,7 +61,7 @@ public abstract class ResultMerge implements Serializable
 	 * 
 	 * @return output (merged) matrix
 	 */
-	public abstract MatrixObject executeSerialMerge();
+	public abstract T executeSerialMerge();
 	
 	/**
 	 * Merge all given input matrices in parallel into the given output matrix.
@@ -80,67 +71,6 @@ public abstract class ResultMerge implements Serializable
 	 * @param par degree of parallelism
 	 * @return output (merged) matrix
 	 */
-	public abstract MatrixObject executeParallelMerge( int par );
-	
-	protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) {
-		mergeWithoutComp(out, in, appendOnly, false);
-	}
+	public abstract T executeParallelMerge(int par);
 	
-	protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) {
-		//pass through to matrix block operations
-		if( _isAccum )
-			out.binaryOperationsInPlace(PLUS, in);
-		else
-			out.merge(in, appendOnly, par);
-	}
-
-	/**
-	 * NOTE: append only not applicable for wiht compare because output must be populated with
-	 * initial state of matrix - with append, this would result in duplicates.
-	 * 
-	 * @param out output matrix block
-	 * @param in input matrix block
-	 * @param compare ?
-	 */
-	protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare ) 
-	{
-		//Notes for result correctness:
-		// * Always iterate over entire block in order to compare all values 
-		//   (using sparse iterator would miss values set to 0) 
-		// * Explicit NaN awareness because for cases were original matrix contains
-		//   NaNs, since NaN != NaN, otherwise we would potentially overwrite results
-		// * For the case of accumulation, we add out += (new-old) to ensure correct results
-		//   because all inputs have the old values replicated
-		
-		if( in.isEmptyBlock(false) ) {
-			if( _isAccum ) return; //nothing to do
-			for( int i=0; i<in.getNumRows(); i++ )
-				for( int j=0; j<in.getNumColumns(); j++ )
-					if( compare.get(i, j) != 0 )
-						out.quickSetValue(i, j, 0);
-		}
-		else { //SPARSE/DENSE
-			int rows = in.getNumRows();
-			int cols = in.getNumColumns();
-			for( int i=0; i<rows; i++ )
-				for( int j=0; j<cols; j++ ) {
-					double valOld = compare.get(i,j);
-					double valNew = in.quickGetValue(i,j); //input value
-					if( (valNew != valOld && !Double.isNaN(valNew) )      //for changed values 
-						|| Double.isNaN(valNew) != Double.isNaN(valOld) ) //NaN awareness 
-					{
-						double value = !_isAccum ? valNew :
-							(out.quickGetValue(i, j) + (valNew - valOld));
-						out.quickSetValue(i, j, value);
-					}
-				}
-		}
-	}
-
-	protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) {
-		//sum of nnz of input (worker result) - output var existing nnz
-		long outNNZ = out.getDataCharacteristics().getNonZeros();
-		return outNNZ - in.size() * outNNZ + in.stream()
-			.mapToLong(m -> m.getDataCharacteristics().getNonZeros()).sum();
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java
new file mode 100644
index 0000000..cd2d99f
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.parfor;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.util.UtilFunctions;
+
+public class ResultMergeFrameLocalMemory extends ResultMerge<FrameObject>
+{
+	private static final long serialVersionUID = 549739254879310540L;
+	
+	public ResultMergeFrameLocalMemory(FrameObject out, FrameObject[] in, String outputFilename, boolean accum) {
+		super( out, in, outputFilename, accum );
+	}
+	
+	@Override
+	public FrameObject executeSerialMerge() 
+	{
+		FrameObject foNew = null; //always create new matrix object (required for nested parallelism)
+		
+		if( LOG.isTraceEnabled() )
+			LOG.trace("ResultMerge (local, in-memory): Execute serial merge for output "
+				+_output.hashCode()+" (fname="+_output.getFileName()+")");
+		
+		try
+		{
+			//get old and new output frame blocks
+			FrameBlock outFB = _output.acquireRead();
+			FrameBlock outFBNew = new FrameBlock(outFB);
+			
+			//create compare matrix if required (existing data in result)
+			FrameBlock compare = outFB;
+			int rlen = compare.getNumRows();
+			int clen = compare.getNumColumns();
+			
+			//serial merge all inputs
+			boolean flagMerged = false;
+			for( FrameObject in : _inputs )
+			{
+				//check for empty inputs (no iterations executed)
+				if( in != null && in != _output ) 
+				{
+					if( LOG.isTraceEnabled() )
+						LOG.trace("ResultMergeFrame (local, in-memory): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
+					
+					//read/pin input_i
+					FrameBlock inMB = in.acquireRead();
+					
+					//core merge
+					for(int j=0; j<clen; j++) {
+						ValueType vt = compare.getSchema()[j];
+						for(int i=0; i<rlen; i++) {
+							Object val1 = compare.get(i, j);
+							Object val2 = inMB.get(i, j);
+							if( UtilFunctions.compareTo(vt, val1, val2) != 0 )
+								outFBNew.set(i, j, val2);
+						}
+					}
+					
+					//unpin and clear in-memory input_i
+					in.release();
+					in.clearData();
+					flagMerged = true;
+				}
+			}
+			
+			//create output and release old output
+			foNew =  flagMerged ? createNewFrameObject(_output, outFBNew) : _output;
+			_output.release();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+
+		//LOG.trace("ResultMerge (local, in-memory): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
+		
+		return foNew;
+	}
+	
+	@Override
+	public FrameObject executeParallelMerge( int par ) {
+		if( LOG.isTraceEnabled() )
+			LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") "
+				+ "merge for output "+_output.hashCode()+" (fname="+_output.getFileName()+")");
+		return executeSerialMerge();
+	}
+
+	private static FrameObject createNewFrameObject( FrameObject foOld, FrameBlock dataNew ) {
+		FrameObject ret = new FrameObject(foOld);
+		ret.acquireModify(dataNew);
+		ret.release();
+		return ret;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
index 92ec8f9..ea5195d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
@@ -26,11 +26,11 @@ import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 
-public class ResultMergeLocalAutomatic extends ResultMerge
+public class ResultMergeLocalAutomatic extends ResultMergeMatrix
 {
 	private static final long serialVersionUID = 1600893100602101732L;
 	
-	private ResultMerge _rm = null;
+	private ResultMergeMatrix _rm = null;
 	
 	public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) {
 		super( out, in, outputFilename, accum );
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index db3d741..441ba3e 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -67,7 +67,7 @@ import java.util.Map.Entry;
  *      NOTE: file merge typically used due to memory constraints - parallel merge would increase the memory
  *      consumption again.
  */
-public class ResultMergeLocalFile extends ResultMerge
+public class ResultMergeLocalFile extends ResultMergeMatrix
 {
 	private static final long serialVersionUID = -6905893742840020489L;
 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index 5c604dd..f422423 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -39,7 +39,7 @@ import java.util.ArrayList;
  * 
  * 
  */
-public class ResultMergeLocalMemory extends ResultMerge
+public class ResultMergeLocalMemory extends ResultMergeMatrix
 {
 	private static final long serialVersionUID = -3543612508601511701L;
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
similarity index 67%
copy from src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java
copy to src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
index 18b09a1..7d0776c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
@@ -19,13 +19,9 @@
 
 package org.apache.sysds.runtime.controlprogram.parfor;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.data.DenseBlock;
-import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 
 import java.io.Serializable;
 import java.util.List;
@@ -36,52 +32,18 @@ import java.util.List;
  * These properties allow us to realize result merging in parallel without any synchronization. 
  * 
  */
-public abstract class ResultMerge implements Serializable
+public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject> implements Serializable
 {
-	//note: this class needs to be serializable to ensure that all attributes of
-	//ResultMergeRemoteSparkWCompare are included in the task closure
-	private static final long serialVersionUID = 2620430969346516677L;
+	private static final long serialVersionUID = 5319002218804570071L;
 	
-	protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName());
-	protected static final String NAME_SUFFIX = "_rm";
-	protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+");
-	
-	//inputs to result merge
-	protected MatrixObject   _output      = null;
-	protected MatrixObject[] _inputs      = null; 
-	protected String         _outputFName = null;
-	protected boolean        _isAccum     = false;
-	
-	protected ResultMerge( ) {
-		//do nothing
+	public ResultMergeMatrix() {
+		super();
 	}
 	
-	public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) {
-		_output = out;
-		_inputs = in;
-		_outputFName = outputFilename;
-		_isAccum = accum;
+	public ResultMergeMatrix(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum) {
+		super(out, in, outputFilename, accum);
 	}
 	
-	/**
-	 * Merge all given input matrices sequentially into the given output matrix.
-	 * The required space in-memory is the size of the output matrix plus the size
-	 * of one input matrix at a time.
-	 * 
-	 * @return output (merged) matrix
-	 */
-	public abstract MatrixObject executeSerialMerge();
-	
-	/**
-	 * Merge all given input matrices in parallel into the given output matrix.
-	 * The required space in-memory is the size of the output matrix plus the size
-	 * of all input matrices.
-	 * 
-	 * @param par degree of parallelism
-	 * @return output (merged) matrix
-	 */
-	public abstract MatrixObject executeParallelMerge( int par );
-	
 	protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) {
 		mergeWithoutComp(out, in, appendOnly, false);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 8a70ecf..6f33225 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -44,7 +44,7 @@ import org.apache.sysds.utils.Statistics;
 
 import java.util.Arrays;
 
-public class ResultMergeRemoteSpark extends ResultMerge
+public class ResultMergeRemoteSpark extends ResultMergeMatrix
 {
 	private static final long serialVersionUID = -6924566953903424820L;
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
index a152c52..6b8d424 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
@@ -31,7 +31,7 @@ import org.apache.sysds.runtime.util.DataConverter;
 
 import scala.Tuple2;
 
-public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock>
+public class ResultMergeRemoteSparkWCompare extends ResultMergeMatrix implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock>
 {
 	private static final long serialVersionUID = -5970805069405942836L;
 	
diff --git a/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java b/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java
index 04f575a..cf7c71a 100644
--- a/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java
@@ -66,8 +66,8 @@ import org.apache.sysds.test.TestConfiguration;
  *    49a: dep, 49b: dep
  * * accumulators
  *    53a: no, 53b dep, 53c dep, 53d dep, 53e dep
- * * lists
- *    54a: no, 54b: no, 54c: dep, 54d: dep
+ * * lists/frames
+ *    54a: no, 54b: no, 54c: dep, 54d: dep, 54e: no-dep, 54f: dep
  * * negative loop increment
  *    55a: no, 55b: yes
  */
@@ -328,6 +328,12 @@ public class ParForDependencyAnalysisTest extends AutomatedTestBase
 	public void testDependencyAnalysis54d() { runTest("parfor54d.dml", true); }
 	
 	@Test
+	public void testDependencyAnalysis54e() { runTest("parfor54e.dml", false); }
+	
+	@Test
+	public void testDependencyAnalysis54f() { runTest("parfor54f.dml", true); }
+	
+	@Test
 	public void testDependencyAnalysis55a() { runTest("parfor55a.dml", false); }
 	
 	@Test
diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java
similarity index 75%
rename from src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java
rename to src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java
index fc952e1..a206781 100644
--- a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java
@@ -25,16 +25,18 @@ import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 
-public class ParForListResultVarsTest extends AutomatedTestBase 
+public class ParForListFrameResultVarsTest extends AutomatedTestBase 
 {
 	private final static String TEST_DIR = "functions/parfor/";
 	private final static String TEST_NAME1 = "parfor_listResults";
-	private final static String TEST_CLASS_DIR = TEST_DIR + ParForListResultVarsTest.class.getSimpleName() + "/";
+	private final static String TEST_NAME2 = "parfor_frameResults";
+	
+	private final static String TEST_CLASS_DIR = TEST_DIR + ParForListFrameResultVarsTest.class.getSimpleName() + "/";
 	
 	@Override
 	public void setUp() {
-		addTestConfiguration(TEST_NAME1, 
-			new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"R"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[]{"R"}));
 	}
 
 	@Test
@@ -47,11 +49,21 @@ public class ParForListResultVarsTest extends AutomatedTestBase
 		runListResultVarTest(TEST_NAME1, 35, 10);
 	}
 	
+	@Test
+	public void testParForFrameResult1a() {
+		runListResultVarTest(TEST_NAME2, 2, 1);
+	}
+	
+	@Test
+	public void testParForFrameResult1b() {
+		runListResultVarTest(TEST_NAME2, 35, 10);
+	}
+	
 	private void runListResultVarTest(String testName, int rows, int cols) {
 		loadTestConfiguration(getTestConfiguration(testName));
 		
 		String HOME = SCRIPT_DIR + TEST_DIR;
-		fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+		fullDMLScriptName = HOME + testName + ".dml";
 		programArgs = new String[]{"-explain","-args",
 			String.valueOf(rows), String.valueOf(cols), output("R") };
 
diff --git a/src/test/scripts/component/parfor/parfor54e.dml b/src/test/scripts/component/parfor/parfor54e.dml
new file mode 100644
index 0000000..70837e9
--- /dev/null
+++ b/src/test/scripts/component/parfor/parfor54e.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = rbind(as.frame("a"), as.frame("b"), as.frame("c"));
+parfor( i in 1:nrow(A) )
+  A[i,1] = as.frame(as.scalar(A[i,1])+"-"+i);
+print(toString(A));
diff --git a/src/test/scripts/component/parfor/parfor54f.dml b/src/test/scripts/component/parfor/parfor54f.dml
new file mode 100644
index 0000000..23bcf44
--- /dev/null
+++ b/src/test/scripts/component/parfor/parfor54f.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = rbind(as.frame("a"), as.frame("b"), as.frame("c"));
+parfor( i in 1:nrow(A) )
+  A[i,1] = as.frame(as.scalar(A[1,1])+"-"+i);
+print(toString(A));
diff --git a/src/test/scripts/functions/parfor/parfor_frameResults.dml b/src/test/scripts/functions/parfor/parfor_frameResults.dml
new file mode 100644
index 0000000..b1a54be
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_frameResults.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+F = as.frame(matrix(0,7,1));
+
+parfor(i in 1:nrow(F))
+  F[i,1] = as.frame(rowMeans(as.matrix(F[i]))+i);
+
+R1 = matrix(0,0,1)
+for(i in 1:length(F))
+  R1 = rbind(R1, as.matrix(F[i,1]));
+
+R = as.matrix(sum(R1==seq(1,7)));
+write(R, $3);

[systemds] 02/02: [SYSTEMDS-3115, 3120] Implements cleaning pipeline enumeration scripts

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 3d1ba3c3b53f48d971c358a99f7c8b3e0b609c2b
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Sep 3 00:15:05 2021 +0200

    [SYSTEMDS-3115,3120] Implements cleaning pipeline enumeration scripts
    
    * Parfor parallelization logical pipeline enumeration
    * Various vectorization in the correctTypos builtin
---
 scripts/builtin/correctTypos.dml               | 51 +++++---------------------
 scripts/pipelines/scripts/enumerateLogical.dml | 25 ++++++-------
 2 files changed, 21 insertions(+), 55 deletions(-)

diff --git a/scripts/builtin/correctTypos.dml b/scripts/builtin/correctTypos.dml
index 9f95a4d..45d3861 100644
--- a/scripts/builtin/correctTypos.dml
+++ b/scripts/builtin/correctTypos.dml
@@ -123,11 +123,7 @@ s_correctTypos = function(Frame[String] strings, Matrix[Double] nullMask, Double
           A = ascii_matrix[1:as.scalar(lengths[i,1]), i];
           B = ascii_matrix[1:as.scalar(lengths[j,1]), j];
           d = damerauLevenshteinDistanceBound(A, B, distance_threshold, FALSE);
-          if (d == -1) {
-            distance_matrix[i, j] = 42000;
-          } else {
-            distance_matrix[i, j] = d;
-          }
+          distance_matrix[i, j] = ifelse(d == -1, 42000, d);
         }
       }
     }
@@ -178,36 +174,26 @@ s_correctTypos = function(Frame[String] strings, Matrix[Double] nullMask, Double
   }
 }
 
-
 replaceStrings = function(String replacement, String to_replace, Frame[String] strings)
   return(Frame[String] strings) 
 {
-  for (i in 1:nrow(strings)) {
-    if (as.scalar(strings[i,]) == to_replace) {
-      strings[i,] = replacement;
-    }
-  }
+  strings = map(strings, "s -> s.equals(\""+to_replace+"\") ? \""+replacement+"\" : s");
 }
 
-
 insertOrIncrement = function(String str, Frame[Unknown] dict)
   return(Frame[Unknown] dict)
 {
   i = 1;
-  ret = FALSE;
   break = FALSE;
   while (i <= nrow(dict) & !break) {
     if (as.scalar(dict[i, 1]) == str) {
-      value = as.integer(as.scalar(dict[i, 2])) + 1;
-      dict[i, 2] = value;
-      contains = TRUE;
+      dict[i, 2] = as.frame(as.integer(as.scalar(dict[i, 2])) + 1);
       break = TRUE;
     }
     i = i + 1;
   }
-  if (!break) {
+  if (!break)
     dict = rbind(dict, cbind(as.frame(str), as.frame(1)));
-  }
 }
 
 
@@ -216,19 +202,11 @@ damerauLevenshteinDistanceBound = function(matrix[double] A, matrix[double] B, d
 
   dl_matrix = matrix(0, rows = length(A) + 1, cols = length(B) + 1);
   dl_matrix[length(A) + 1, length(B) + 1] = -1;
-
-  for (j in 2:length(B) + 1) {
-    dl_matrix[1, j] = j - 1;
-  }
-
+  dl_matrix[1, 2:(length(B)+1)] = t(seq(2,length(B)+1) - 1);
   dl_matrix[2, 1] = 1;
 
   for (j in 2:length(B) + 1) {
-    if (as.scalar(A[1]) == as.scalar(B[j - 1])) {
-      cost = 0;
-    } else {
-      cost = 1;
-    }
+    cost = as.integer(as.scalar(A[1]) != as.scalar(B[j - 1]))
     dl_matrix[2, j] = min(min(
       dl_matrix[2, j - 1] + 1, 
       dl_matrix[1, j] + 1),
@@ -241,23 +219,14 @@ damerauLevenshteinDistanceBound = function(matrix[double] A, matrix[double] B, d
     i += 1;
 
     dl_matrix[i, 1] = i - 1;
-
-    if (as.scalar(A[i - 1]) == as.scalar(B[1])) {
-      cost = 0;
-    } else {
-      cost = 1;
-    }
+    cost = as.integer(as.scalar(A[i - 1]) != as.scalar(B[1]))
     dl_matrix[i, 2] = min(min(
       dl_matrix[i - 1, 2] + 1, 
       dl_matrix[i, 1] + 1),
       dl_matrix[i - 1, 1] + cost);
 
     for (j in 3:length(B) + 1) {
-      if (as.scalar(A[i - 1]) == as.scalar(B[j - 1])) {
-        cost = 0;
-      } else {
-        cost = 1;
-      }
+      cost = as.integer(as.scalar(A[i - 1]) != as.scalar(B[j - 1]))
       if (as.scalar(A[i - 1]) == as.scalar(B[j - 2]) & as.scalar(A[i - 2]) == as.scalar(B[j - 1])) {
         dl_matrix[i, j] = min(min(
           dl_matrix[i, j - 1] + 1, 
@@ -272,9 +241,7 @@ damerauLevenshteinDistanceBound = function(matrix[double] A, matrix[double] B, d
       }
     }
 
-    if( min(dl_matrix[i - 1, ]) > bound & min(dl_matrix[i, ]) > bound) {
-      break_condition = TRUE;
-    }
+    break_condition = min(dl_matrix[i - 1, ]) > bound & min(dl_matrix[i, ]) > bound;
   }
 
   if (is_verbose){
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml b/scripts/pipelines/scripts/enumerateLogical.dml
index f894c4e..1133eb7 100644
--- a/scripts/pipelines/scripts/enumerateLogical.dml
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -67,9 +67,9 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
   {
     physicalPipList = list()
     logicalPipList = list()
+    
     # # # get the physical instances from logical ones
-    for(i in 1:nrow(population))
-    { 
+    for(i in 1:nrow(population)) { 
       lv = as.integer(as.scalar(population[i, 1])) + 1
       lp = population[i, 2:lv]
       physicalConf = bandit::get_physical_configurations(lp, num_inst, primitives)
@@ -77,23 +77,22 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
       logicalPipList = append(logicalPipList, lp)
     }
     
-    scores = matrix(0, rows=length(physicalPipList), cols=1)
-    
     # # # execute the physical pipelines
-    for(i in 1:length(physicalPipList))
-    {
-      physicalConf = as.frame(physicalPipList[i])
-      lp = as.frame(logicalPipList[i])
+    scores = matrix(0, length(physicalPipList), 1)
+    # TODO better parfor-dep handling of multi-assignments to avoid check=0 
+    parfor(i in 1:length(physicalPipList), check=0) {
+      lp2 = as.frame(logicalPipList[i,1])
+      pp2 = as.frame(physicalPipList[i,1])
       # # append configuration keys for extracting the pipeline later on
-      id = seq(1, nrow(physicalConf))
-      physicalConf = cbind(as.frame(id), physicalConf)
+      id = seq(1, nrow(pp2))
+      idpp = cbind(as.frame(id), pp2)
 
       # # execute the physical instances and store the minimum scores, each pipeline is executed num_exec times
-      [outPip,outHp, feaFrameOuter] = bandit::run_with_hyperparam(lp, physicalConf, num_exec, X, y, Xtest, ytest, metaList,
+      [outPip, outHp, feaFrameOuter] = bandit::run_with_hyperparam(lp2, idpp, num_exec, X, y, Xtest, ytest, metaList,
         evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, verbose)
       # # sort the configurations groupwise
-      max_perf =  bandit::getMaxPerConf(outPip, nrow(physicalConf)) 
-      scores[i] = as.matrix(max_perf[1, 1])
+      max_perf = bandit::getMaxPerConf(outPip, nrow(pp2)) 
+      scores[i,1] = as.matrix(max_perf[1,1])
     }
     
     # # select parents and best score