You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2016/01/31 19:38:28 UTC

incubator-systemml git commit: [SYSTEMML-492] Remove flex scheduler config

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2bcfdc6c9 -> 290359f96


[SYSTEMML-492] Remove flex scheduler config

Also add "mapred.task.id" to "mapreduce.task.attempt.id" code ([SYSTEMML-476]).

Closes #59.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/290359f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/290359f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/290359f9

Branch: refs/heads/master
Commit: 290359f9622138117bc047997bb01ded81a29e91
Parents: 2bcfdc6
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Sun Jan 31 10:33:52 2016 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Sun Jan 31 10:33:52 2016 -0800

----------------------------------------------------------------------
 .../runtime/controlprogram/ParForProgramBlock.java    |  1 -
 .../parfor/DataPartitionerRemoteMR.java               |  8 --------
 .../runtime/controlprogram/parfor/RemoteParForMR.java | 10 ----------
 .../controlprogram/parfor/ResultMergeRemoteMR.java    |  9 ---------
 .../apache/sysml/runtime/matrix/mapred/GMRMapper.java |  2 +-
 .../runtime/matrix/mapred/MRConfigurationNames.java   |  3 +++
 .../sysml/runtime/matrix/mapred/ReduceBase.java       |  3 +--
 .../apache/sysml/runtime/transform/GTFMTDMapper.java  |  6 +++---
 .../org/apache/sysml/runtime/util/MapReduceTool.java  | 14 +++++++-------
 9 files changed, 15 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 83fcc0f..5d5b4ca 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -213,7 +213,6 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final boolean ALLOW_NESTED_PARALLELISM	= true;    // if not, transparently change parfor to for on program conversions (local,remote)
 	public static       boolean ALLOW_REUSE_MR_JVMS         = true;    // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
 	public static       boolean ALLOW_REUSE_MR_PAR_WORKER   = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation!
-	public static final boolean USE_FLEX_SCHEDULER_CONF     = false;
 	public static final boolean USE_PARALLEL_RESULT_MERGE   = false;    // if result merge is run in parallel or serial 
 	public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
 	public static final boolean ALLOW_DATA_COLOCATION       = true;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index 5a679a1..0d45af0 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -155,14 +155,6 @@ public class DataPartitionerRemoteMR extends DataPartitioner
 		    }
 		    job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) ); 	
 
-			//use FLEX scheduler configuration properties
-			/*if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
-			{
-				job.setInt("flex.map.min", 0);
-				job.setInt("flex.map.max", _numMappers);
-				job.setInt("flex.reduce.min", 0);
-				job.setInt("flex.reduce.max", _numMappers);
-			}*/
 			
 			//disable automatic tasks timeouts and speculative task exec
 			job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 589157c..7253e34 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -150,16 +150,6 @@ public class RemoteParForMR
 			//job.setInt("mapred.tasktracker.tasks.maximum",1); //system property
 			//job.setInt("mapred.jobtracker.maxtasks.per.job",1); //system property
 
-			//use FLEX scheduler configuration properties
-			if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
-			{
-				job.setInt("flex.priority",0); //highest
-				
-				job.setInt("flex.map.min", 0);
-				job.setInt("flex.map.max", numMappers);
-				job.setInt("flex.reduce.min", 0);
-				job.setInt("flex.reduce.max", numMappers);
-			}
 			
 			//set jvm memory size (if require)
 			String memKey = MRConfigurationNames.MR_CHILD_JAVA_OPTS;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index ede1849..1bdafae 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -35,7 +35,6 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -283,14 +282,6 @@ public class ResultMergeRemoteMR extends ResultMerge
 		    	reducerGroups = Math.max((rlen*clen)/StagingFileUtils.CELL_BUFFER_SIZE, 1);
 			job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) ); 	
 
-			//use FLEX scheduler configuration properties
-			if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
-			{
-				job.setInt("flex.map.min", 0);
-				job.setInt("flex.map.max", _numMappers);
-				job.setInt("flex.reduce.min", 0);
-				job.setInt("flex.reduce.max", _numMappers);
-			}
 			
 			//disable automatic tasks timeouts and speculative task exec
 			job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
index be7dc63..f31dee6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
@@ -181,7 +181,7 @@ implements Mapper<Writable, Writable, Writable, Writable>
 	{
 		super.configure(job);
 		
-		mapperID = job.get("mapred.task.id");
+		mapperID = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		dimsUnknownFilePrefix = job.get("dims.unknown.file.prefix");
 		
 		_filterEmptyInputBlocks = allowsFilterEmptyInputBlocks();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
index c2996c2..dc9343d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
@@ -72,6 +72,7 @@ public abstract class MRConfigurationNames {
 	// NOTE: mapreduce.reduce.java.opts commented out in mapred-default.xml so as to not override mapred.child.java.opts
 	public static final String MR_REDUCE_JAVA_OPTS;
 	public static final String MR_REDUCE_MEMORY_MB; // mapred-default.xml
+	public static final String MR_TASK_ATTEMPT_ID; // N/A
 	public static final String MR_TASK_ID; // N/A
 	public static final String MR_TASK_IO_SORT_MB; // mapred-default.xml
 	public static final String MR_TASK_TIMEOUT; // N/A
@@ -110,6 +111,7 @@ public abstract class MRConfigurationNames {
 			MR_REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
 			MR_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
 			MR_REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+			MR_TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
 			MR_TASK_ID = "mapreduce.task.id";
 			MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb";
 			MR_TASK_TIMEOUT = "mapreduce.task.timeout";
@@ -140,6 +142,7 @@ public abstract class MRConfigurationNames {
 			MR_REDUCE_INPUT_BUFFER_PERCENT = "mapred.job.reduce.input.buffer.percent";
 			MR_REDUCE_JAVA_OPTS = "mapred.reduce.child.java.opts";
 			MR_REDUCE_MEMORY_MB = "mapred.job.reduce.memory.mb";
+			MR_TASK_ATTEMPT_ID = "mapred.task.id";
 			MR_TASK_ID = "mapred.tip.id";
 			MR_TASK_IO_SORT_MB = "io.sort.mb";
 			MR_TASK_TIMEOUT = "mapred.task.timeout";

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
index 7462f8a..d6e3939 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.Plus;
@@ -100,7 +99,7 @@ public class ReduceBase extends MRBaseForCommonInstructions
 	{	
 		super.configure(job);
 		
-		reducerID = job.get("mapred.task.id");
+		reducerID = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		dimsUnknownFilePrefix = job.get("dims.unknown.file.prefix");
 
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
index 4e3ece5..3409ee4 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.wink.json4j.JSONException;
-
 import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 
 
 public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
@@ -51,7 +51,7 @@ public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, Dis
 	 */
 	@Override
 	public void configure(JobConf job) {
-		String[] parts = job.get("mapred.task.id").split("_");
+		String[] parts = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID).split("_");
 		if ( parts[0].equalsIgnoreCase("task")) {
 			_mapTaskID = Integer.parseInt(parts[parts.length-1]);
 		}
@@ -59,7 +59,7 @@ public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, Dis
 			_mapTaskID = Integer.parseInt(parts[parts.length-2]);
 		}
 		else {
-			throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
+			throw new RuntimeException("Unrecognized format for taskID: " + job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID));
 		}
 
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index b0089ad..a142a2d 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -52,6 +51,7 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.sort.ReadWithZeros;
 import org.apache.wink.json4j.OrderedJSONObject;
 
@@ -68,10 +68,10 @@ public class MapReduceTool
 	
 	public static String getUniqueKeyPerTask(JobConf job, boolean inMapper) {
 		//TODO: investigate ID pattern, required for parallel jobs
-		/*String nodePrefix = job.get("mapred.task.id");
+		/*String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		return String.valueOf(IDHandler.extractLongID(nodePrefix));*/
 		
-		String nodePrefix = job.get("mapred.task.id");
+		String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		int i;
 		if (inMapper)
 			i = nodePrefix.indexOf("_m_");
@@ -85,7 +85,7 @@ public class MapReduceTool
 	
 	@Deprecated
 	public static String getUniqueKeyPerTaskWithLeadingZros(JobConf job, boolean inMapper) {
-		String nodePrefix = job.get("mapred.task.id");
+		String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		int i;
 		if (inMapper)
 			i = nodePrefix.indexOf("_m_");
@@ -99,10 +99,10 @@ public class MapReduceTool
 	
 	public static int getUniqueTaskId(JobConf job) {
 		//TODO: investigate ID pattern, required for parallel jobs
-		/*String nodePrefix = job.get("mapred.task.id"); 
+		/*String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID); 
 		return IDHandler.extractIntID(nodePrefix);*/
 		
-		String nodePrefix = job.get("mapred.task.id");
+		String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 		int j = nodePrefix.lastIndexOf("_");
 		int i=nodePrefix.lastIndexOf("_", j-1);
 		nodePrefix = nodePrefix.substring(i+1, j);
@@ -111,7 +111,7 @@ public class MapReduceTool
 	}
 
 	public static String getGloballyUniqueName(JobConf job) {
-		return job.get("mapred.task.id");
+		return job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
 	}
 
 	public static boolean existsFileOnHDFS(String fname){