You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/18 08:16:03 UTC

incubator-systemml git commit: [SYSTEMML-1281] Fix memory efficiency csv/dataset-binary rdd converters

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 132a43d38 -> c360304eb


[SYSTEMML-1281] Fix memory efficiency csv/dataset-binary rdd converters

This patch makes the following two improvements wrt memory efficiency to
our csv-binaryblock and dataset-binaryblock matrix rdd converters:

(1) Shallow block copy on createCombiner for combineByKey to reduce the
temporary memory consumption per task, especially for scenarios with
almost no local aggregation. The shallow copy is safe, as the inputs are
temporary partial blocks that are not accessible to any other operation. 

(2) Explicitly controlled number of output partitions according to its
size in binary block representation. So far we simply used the number of
input partitions. For compressed dataset inputs this lead to
unnecessarily large output partitions, and thus creating memory pressure
for subsequent tasks. For csv inputs, this lead to unnecessarily small
output partitions that are later coalesce to the preferred number of
partitions - however since coalesce only balances the number of merged
partitions, this could lead to load imbalance. Both problems are now
systematically solved at its root cause.  

Finally, this patch also includes a number of minor cleanups with regard
to probing the number of partitions, missing tests, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c360304e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c360304e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c360304e

Branch: refs/heads/master
Commit: c360304ebbf2d05d25067bbded49a3fbdb1edaac
Parents: 132a43d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 17 23:41:55 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 17 23:56:12 2017 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  5 ++-
 .../instructions/gpu/context/JCudaContext.java  |  2 +-
 .../spark/CheckpointSPInstruction.java          | 18 ++---------
 .../instructions/spark/MapmmSPInstruction.java  |  4 +--
 .../spark/utils/RDDAggregateUtils.java          | 33 +++++++++++++++++---
 .../spark/utils/RDDConverterUtils.java          | 20 ++++++------
 .../instructions/spark/utils/SparkUtils.java    | 22 +++++++------
 .../functions/io/csv/ReadCSVTest.java           | 29 ++++++++++++++---
 8 files changed, 83 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 77bcc8d..b7fe6e8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -49,7 +49,6 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.spark.CheckpointSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.SPInstruction;
 import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysml.runtime.instructions.spark.data.LineageObject;
@@ -1181,8 +1180,8 @@ public class SparkExecutionContext extends ExecutionContext
 					((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
 			
 			//investigate issue of unnecessarily large number of partitions
-			int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
-			if( numPartitions < in.partitions().size() )
+			int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
+			if( numPartitions < in.getNumPartitions() )
 				in = in.coalesce( numPartitions );
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
index af4cfbd..38f4e4c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
@@ -133,7 +133,7 @@ public class JCudaContext extends GPUContext {
 			long free[] = {0};
 			long total[] = {0};
 			if (cudaMemGetInfo(free, total) == cudaSuccess) {
-				long totalNumBytes = total[0];
+				//long totalNumBytes = total[0];
 				deviceMemBytes.set(free[0]);
 			} else {
 				throw new RuntimeException("ERROR: Unable to get memory information of the GPU.");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 952e43c..1fa30b6 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.BooleanObject;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
@@ -103,8 +102,8 @@ public class CheckpointSPInstruction extends UnarySPInstruction
 		{
 			//(trigger coalesce if intended number of partitions exceeded by 20%
 			//and not hash partitioned to avoid losing the existing partitioner)
-			int numPartitions = getNumCoalescePartitions(mcIn, in);
-			boolean coalesce = ( 1.2*numPartitions < in.partitions().size() 
+			int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
+			boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions() 
 					&& !SparkUtils.isHashPartitioned(in) );
 			
 			//checkpoint pre-processing rdd operations
@@ -157,17 +156,4 @@ public class CheckpointSPInstruction extends UnarySPInstruction
 		}
 		sec.setVariable( output.getName(), cd);
 	}
-
-	public static int getNumCoalescePartitions(MatrixCharacteristics mc, JavaPairRDD<?,?> in)
-	{
-		if( mc.dimsKnown(true) ) {
-			double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
-			double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
-			return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
-		}
-		else {
-			return in.partitions().size();
-		}
-	}
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 310664f..33eacbe 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -135,8 +135,8 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		{
 			JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
 			if( requiresFlatMapFunction(_type, mcBc) ) {
-				if( requiresRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()) )
-					in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()));
+				if( requiresRepartitioning(_type, mcRdd, mcBc, in1.getNumPartitions()) )
+					in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.getNumPartitions()));
 				out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
 			}
 			else if( preservesPartitioning(mcRdd, _type) )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 8038157..61c950a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -155,13 +155,30 @@ public class RDDAggregateUtils
 	 * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
 	 * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
 	 */
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in ) {
+		return mergeByKey(in, in.getNumPartitions(), true);
+	}
+	
+	/**
+	 * Merges disjoint data of all blocks per key.
+	 * 
+	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
+	 * assumption of disjoint data is violated.
+	 * 
+	 * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+	 * @param numPartitions number of output partitions
+	 * @param deepCopyCombiner indicator if the createCombiner functions needs to deep copy the input block
+	 * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+	 */
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+			int numPartitions, boolean deepCopyCombiner )
 	{
 		//use combine by key to avoid unnecessary deep block copies, i.e.
 		//create combiner block once and merge remaining blocks in-place.
- 		return in.combineByKey( new CreateBlockCombinerFunction(), 
+ 		return in.combineByKey( 
+ 				new CreateBlockCombinerFunction(deepCopyCombiner), 
 			    new MergeBlocksFunction(false), 
-			    new MergeBlocksFunction(false) );
+			    new MergeBlocksFunction(false), numPartitions );
 	}
 	
 	/**
@@ -251,13 +268,19 @@ public class RDDAggregateUtils
 	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 1987501624176848292L;
-
+		
+		private final boolean _deep;
+		
+		public CreateBlockCombinerFunction(boolean deep) {
+			_deep = deep;
+		}
+		
 		@Override
 		public MatrixBlock call(MatrixBlock arg0) 
 			throws Exception 
 		{
 			//create deep copy of given block
-			return new MatrixBlock(arg0);
+			return _deep ? new MatrixBlock(arg0) : arg0;
 		}	
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index e847471..d1e6793 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -185,10 +185,11 @@ public class RDDConverterUtils
 				prepinput.mapPartitionsToPair(new CSVToBinaryBlockFunction(
 						mc, sparse, hasHeader, delim, fill, fillValue));
 		
-		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
-		
-		return out;
+		//aggregate partial matrix blocks (w/ preferred number of output 
+		//partitions as the data is likely smaller in binary block format,
+		//but also to bound the size of partitions for compressed inputs)
+		int parts = SparkUtils.getNumPreferredPartitions(mc, out);
+		return RDDAggregateUtils.mergeByKey(out, parts, false); 
 	}
 	
 	/**
@@ -256,10 +257,11 @@ public class RDDConverterUtils
 				prepinput.mapPartitionsToPair(
 					new DataFrameToBinaryBlockFunction(mc, sparse, containsID, isVector));
 		
-		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
-		
-		return out;
+		//aggregate partial matrix blocks (w/ preferred number of output 
+		//partitions as the data is likely smaller in binary block format,
+		//but also to bound the size of partitions for compressed inputs)
+		int parts = SparkUtils.getNumPreferredPartitions(mc, out);
+		return RDDAggregateUtils.mergeByKey(out, parts, false); 
 	}
 
 	public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession,
@@ -312,7 +314,7 @@ public class RDDConverterUtils
 		double datasize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
 		double rowsize = OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(),
 				mc.getNumRowBlocks(), mc.getColsPerBlock(), Math.ceil((double)mc.getNonZeros()/mc.getRows()));
-		double partsize = Math.ceil(datasize/in.partitions().size());
+		double partsize = Math.ceil(datasize/in.getNumPartitions());
 		double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock());
 		return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index d27e37a..2fe3981 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -29,8 +29,10 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
@@ -62,12 +64,10 @@ public class SparkUtils
 		return new Tuple2<MatrixIndexes,MatrixBlock>(in.getIndexes(), (MatrixBlock)in.getValue());
 	}
 
-	public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in )
-	{
+	public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in ) {
 		ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
 		for( IndexedMatrixValue imv : in )
 			ret.add(fromIndexedMatrixBlock(imv));
-		
 		return ret;
 	}
 
@@ -75,12 +75,10 @@ public class SparkUtils
 		return new Pair<MatrixIndexes,MatrixBlock>(in.getIndexes(), (MatrixBlock)in.getValue());
 	}
 
-	public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in )
-	{
+	public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) {
 		ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
 		for( IndexedMatrixValue imv : in )
 			ret.add(fromIndexedMatrixBlockToPair(imv));
-		
 		return ret;
 	}
 
@@ -88,12 +86,10 @@ public class SparkUtils
 		return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue());
 	}
 
-	public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in )
-	{
+	public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in ) {
 		ArrayList<Tuple2<Long, FrameBlock>> ret = new ArrayList<Tuple2<Long, FrameBlock>>();
 		for( Pair<Long, FrameBlock> ifv : in )
 			ret.add(fromIndexedFrameBlock(ifv));
-		
 		return ret;
 	}
 
@@ -120,6 +116,14 @@ public class SparkUtils
 			&& in.rdd().partitioner().get() instanceof HashPartitioner;
 	}
 	
+	public static int getNumPreferredPartitions(MatrixCharacteristics mc, JavaPairRDD<?,?> in) {
+		if( !mc.dimsKnown(true) )
+			return in.getNumPartitions();
+		double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
+		double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+		return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
+	}
+	
 	/**
 	 * Creates a partitioning-preserving deep copy of the input matrix RDD, where 
 	 * the indexes and values are copied.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
index 65fbfa0..12d07ef 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.test.integration.functions.io.csv;
 
 import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.CompilerConfig;
 import org.apache.sysml.test.integration.AutomatedTestBase;
@@ -37,7 +38,6 @@ import org.apache.sysml.test.utils.TestUtils;
 
 public class ReadCSVTest extends AutomatedTestBase 
 {
-	
 	private final static String TEST_NAME = "ReadCSVTest";
 	private final static String TEST_DIR = "functions/io/csv/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + ReadCSVTest.class.getSimpleName() + "/";
@@ -78,6 +78,11 @@ public class ReadCSVTest extends AutomatedTestBase
 	}
 	
 	@Test
+	public void testCSV1_SP() {
+		runCSVTest(1, RUNTIME_PLATFORM.SPARK, true);
+	}
+	
+	@Test
 	public void testCSV2_Sequential_CP1() {
 		runCSVTest(2, RUNTIME_PLATFORM.SINGLE_NODE, false);
 	}
@@ -101,6 +106,11 @@ public class ReadCSVTest extends AutomatedTestBase
 	public void testCSV2_MR() {
 		runCSVTest(2, RUNTIME_PLATFORM.HADOOP, true);
 	}
+	
+	@Test
+	public void testCSV2_SP() {
+		runCSVTest(2, RUNTIME_PLATFORM.SPARK, true);
+	}
 
 	@Test
 	public void testCSV3_Sequential_CP1() {
@@ -127,6 +137,11 @@ public class ReadCSVTest extends AutomatedTestBase
 		runCSVTest(3, RUNTIME_PLATFORM.HADOOP, false);
 	}
 	
+	@Test
+	public void testCSV3_SP() {
+		runCSVTest(3, RUNTIME_PLATFORM.SPARK, false);
+	}
+	
 	/**
 	 * 
 	 * @param testNumber
@@ -135,13 +150,17 @@ public class ReadCSVTest extends AutomatedTestBase
 	 */
 	private void runCSVTest(int testNumber, RUNTIME_PLATFORM platform, boolean parallel) 
 	{
-		
 		RUNTIME_PLATFORM oldPlatform = rtplatform;
+		rtplatform = platform;
+		
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
 		boolean oldpar = CompilerConfig.FLAG_PARREADWRITE_TEXT;
 		
 		try
 		{
-			rtplatform = platform;
 			CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel;
 			
 			TestConfiguration config = getTestConfiguration(TEST_NAME);
@@ -167,10 +186,10 @@ public class ReadCSVTest extends AutomatedTestBase
 			
 			TestUtils.compareScalars(dmlScalar, rScalar, eps);
 		}
-		finally
-		{
+		finally {
 			rtplatform = oldPlatform;
 			CompilerConfig.FLAG_PARREADWRITE_TEXT = oldpar;		
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 		}
 	}