You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/07/17 10:37:50 UTC

[systemds] branch master updated: [SYSTEMDS-3065] Close SparkContext silently

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 810ae1f  [SYSTEMDS-3065] Close SparkContext silently
810ae1f is described below

commit 810ae1f7009085d58e32e6e493fdf7be100f2780
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Jul 17 12:33:23 2021 +0200

    [SYSTEMDS-3065] Close SparkContext silently
    
    When executing sequences of spark instructions, then sometimes the
    spark context throw errors when closing the context after all results
    are correctly calculated and returned.
    
    These errors are known to be inconsequential and therefore only confuse
    a user. This commit remove these error messages limited to the content of
    messages produced in closing the spark context after execution is done.
---
 .../context/SparkExecutionContext.java             | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index cfaa50e..1f6c275 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -19,9 +19,22 @@
 
 package org.apache.sysds.runtime.controlprogram.context;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -49,8 +62,8 @@ import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
@@ -78,16 +91,8 @@ import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.utils.MLContextProxy;
 import org.apache.sysds.utils.Statistics;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
+import scala.Tuple2;
 
 
 public class SparkExecutionContext extends ExecutionContext
@@ -161,12 +166,24 @@ public class SparkExecutionContext extends ExecutionContext
 	}
 
 	public void close() {
-		synchronized( SparkExecutionContext.class ) {
-			if( _spctx != null ) {
-				//stop the spark context if existing
+		synchronized( SparkExecutionContext.class) {
+			if(_spctx != null) {
+				Logger spL = Logger.getLogger("org.apache.spark.network.client.TransportResponseHandler");
+				spL.setLevel(Level.FATAL);
+				OutputStream buff = new OutputStream() {
+					@Override
+					public void write(int b) {
+						// do Nothing
+					}
+				};
+				PrintStream old = System.err;
+				System.setErr(new PrintStream(buff));
+				// stop the spark context if existing
 				_spctx.stop();
-				//make sure stopped context is never used again
+				// make sure stopped context is never used again
 				_spctx = null;
+				System.setErr(old);
+				spL.setLevel(Level.ERROR);
 			}
 		}
 	}
@@ -1018,7 +1035,7 @@ public class SparkExecutionContext extends ExecutionContext
 			List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
 
 			if( list.size()>1 )
-				throw new DMLRuntimeException("Expecting no more than one result block.");
+				throw new DMLRuntimeException("Expecting no more than one result block but got: " + list.size());
 			else if( list.size()==1 )
 				out = list.get(0)._2();
 			else //empty (e.g., after ops w/ outputEmpty=false)