You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2016/01/31 19:38:28 UTC
incubator-systemml git commit: [SYSTEMML-492] Remove flex scheduler
config
Repository: incubator-systemml
Updated Branches:
refs/heads/master 2bcfdc6c9 -> 290359f96
[SYSTEMML-492] Remove flex scheduler config
Also add "mapred.task.id" to "mapreduce.task.attempt.id" code ([SYSTEMML-476]).
Closes #59.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/290359f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/290359f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/290359f9
Branch: refs/heads/master
Commit: 290359f9622138117bc047997bb01ded81a29e91
Parents: 2bcfdc6
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Sun Jan 31 10:33:52 2016 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Sun Jan 31 10:33:52 2016 -0800
----------------------------------------------------------------------
.../runtime/controlprogram/ParForProgramBlock.java | 1 -
.../parfor/DataPartitionerRemoteMR.java | 8 --------
.../runtime/controlprogram/parfor/RemoteParForMR.java | 10 ----------
.../controlprogram/parfor/ResultMergeRemoteMR.java | 9 ---------
.../apache/sysml/runtime/matrix/mapred/GMRMapper.java | 2 +-
.../runtime/matrix/mapred/MRConfigurationNames.java | 3 +++
.../sysml/runtime/matrix/mapred/ReduceBase.java | 3 +--
.../apache/sysml/runtime/transform/GTFMTDMapper.java | 6 +++---
.../org/apache/sysml/runtime/util/MapReduceTool.java | 14 +++++++-------
9 files changed, 15 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 83fcc0f..5d5b4ca 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -213,7 +213,6 @@ public class ParForProgramBlock extends ForProgramBlock
public static final boolean ALLOW_NESTED_PARALLELISM = true; // if not, transparently change parfor to for on program conversions (local,remote)
public static boolean ALLOW_REUSE_MR_JVMS = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
public static boolean ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation!
- public static final boolean USE_FLEX_SCHEDULER_CONF = false;
public static final boolean USE_PARALLEL_RESULT_MERGE = false; // if result merge is run in parallel or serial
public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
public static final boolean ALLOW_DATA_COLOCATION = true;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index 5a679a1..0d45af0 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -155,14 +155,6 @@ public class DataPartitionerRemoteMR extends DataPartitioner
}
job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) );
- //use FLEX scheduler configuration properties
- /*if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
- {
- job.setInt("flex.map.min", 0);
- job.setInt("flex.map.max", _numMappers);
- job.setInt("flex.reduce.min", 0);
- job.setInt("flex.reduce.max", _numMappers);
- }*/
//disable automatic tasks timeouts and speculative task exec
job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 589157c..7253e34 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -150,16 +150,6 @@ public class RemoteParForMR
//job.setInt("mapred.tasktracker.tasks.maximum",1); //system property
//job.setInt("mapred.jobtracker.maxtasks.per.job",1); //system property
- //use FLEX scheduler configuration properties
- if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
- {
- job.setInt("flex.priority",0); //highest
-
- job.setInt("flex.map.min", 0);
- job.setInt("flex.map.max", numMappers);
- job.setInt("flex.reduce.min", 0);
- job.setInt("flex.reduce.max", numMappers);
- }
//set jvm memory size (if require)
String memKey = MRConfigurationNames.MR_CHILD_JAVA_OPTS;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index ede1849..1bdafae 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -35,7 +35,6 @@ import org.apache.sysml.api.DMLScript;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -283,14 +282,6 @@ public class ResultMergeRemoteMR extends ResultMerge
reducerGroups = Math.max((rlen*clen)/StagingFileUtils.CELL_BUFFER_SIZE, 1);
job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) );
- //use FLEX scheduler configuration properties
- if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
- {
- job.setInt("flex.map.min", 0);
- job.setInt("flex.map.max", _numMappers);
- job.setInt("flex.reduce.min", 0);
- job.setInt("flex.reduce.max", _numMappers);
- }
//disable automatic tasks timeouts and speculative task exec
job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
index be7dc63..f31dee6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GMRMapper.java
@@ -181,7 +181,7 @@ implements Mapper<Writable, Writable, Writable, Writable>
{
super.configure(job);
- mapperID = job.get("mapred.task.id");
+ mapperID = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
dimsUnknownFilePrefix = job.get("dims.unknown.file.prefix");
_filterEmptyInputBlocks = allowsFilterEmptyInputBlocks();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
index c2996c2..dc9343d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
@@ -72,6 +72,7 @@ public abstract class MRConfigurationNames {
// NOTE: mapreduce.reduce.java.opts commented out in mapred-default.xml so as to not override mapred.child.java.opts
public static final String MR_REDUCE_JAVA_OPTS;
public static final String MR_REDUCE_MEMORY_MB; // mapred-default.xml
+ public static final String MR_TASK_ATTEMPT_ID; // N/A
public static final String MR_TASK_ID; // N/A
public static final String MR_TASK_IO_SORT_MB; // mapred-default.xml
public static final String MR_TASK_TIMEOUT; // N/A
@@ -110,6 +111,7 @@ public abstract class MRConfigurationNames {
MR_REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
MR_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
MR_REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+ MR_TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
MR_TASK_ID = "mapreduce.task.id";
MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb";
MR_TASK_TIMEOUT = "mapreduce.task.timeout";
@@ -140,6 +142,7 @@ public abstract class MRConfigurationNames {
MR_REDUCE_INPUT_BUFFER_PERCENT = "mapred.job.reduce.input.buffer.percent";
MR_REDUCE_JAVA_OPTS = "mapred.reduce.child.java.opts";
MR_REDUCE_MEMORY_MB = "mapred.job.reduce.memory.mb";
+ MR_TASK_ATTEMPT_ID = "mapred.task.id";
MR_TASK_ID = "mapred.tip.id";
MR_TASK_IO_SORT_MB = "io.sort.mb";
MR_TASK_TIMEOUT = "mapred.task.timeout";
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
index 7462f8a..d6e3939 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReduceBase.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.Plus;
@@ -100,7 +99,7 @@ public class ReduceBase extends MRBaseForCommonInstructions
{
super.configure(job);
- reducerID = job.get("mapred.task.id");
+ reducerID = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
dimsUnknownFilePrefix = job.get("dims.unknown.file.prefix");
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
index 4e3ece5..3409ee4 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.wink.json4j.JSONException;
-
import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
@@ -51,7 +51,7 @@ public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, Dis
*/
@Override
public void configure(JobConf job) {
- String[] parts = job.get("mapred.task.id").split("_");
+ String[] parts = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID).split("_");
if ( parts[0].equalsIgnoreCase("task")) {
_mapTaskID = Integer.parseInt(parts[parts.length-1]);
}
@@ -59,7 +59,7 @@ public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, Dis
_mapTaskID = Integer.parseInt(parts[parts.length-2]);
}
else {
- throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
+ throw new RuntimeException("Unrecognized format for taskID: " + job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID));
}
try {
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/290359f9/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index b0089ad..a142a2d 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.parser.Expression.ValueType;
@@ -52,6 +51,7 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.sort.ReadWithZeros;
import org.apache.wink.json4j.OrderedJSONObject;
@@ -68,10 +68,10 @@ public class MapReduceTool
public static String getUniqueKeyPerTask(JobConf job, boolean inMapper) {
//TODO: investigate ID pattern, required for parallel jobs
- /*String nodePrefix = job.get("mapred.task.id");
+ /*String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
return String.valueOf(IDHandler.extractLongID(nodePrefix));*/
- String nodePrefix = job.get("mapred.task.id");
+ String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
int i;
if (inMapper)
i = nodePrefix.indexOf("_m_");
@@ -85,7 +85,7 @@ public class MapReduceTool
@Deprecated
public static String getUniqueKeyPerTaskWithLeadingZros(JobConf job, boolean inMapper) {
- String nodePrefix = job.get("mapred.task.id");
+ String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
int i;
if (inMapper)
i = nodePrefix.indexOf("_m_");
@@ -99,10 +99,10 @@ public class MapReduceTool
public static int getUniqueTaskId(JobConf job) {
//TODO: investigate ID pattern, required for parallel jobs
- /*String nodePrefix = job.get("mapred.task.id");
+ /*String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
return IDHandler.extractIntID(nodePrefix);*/
- String nodePrefix = job.get("mapred.task.id");
+ String nodePrefix = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
int j = nodePrefix.lastIndexOf("_");
int i=nodePrefix.lastIndexOf("_", j-1);
nodePrefix = nodePrefix.substring(i+1, j);
@@ -111,7 +111,7 @@ public class MapReduceTool
}
public static String getGloballyUniqueName(JobConf job) {
- return job.get("mapred.task.id");
+ return job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
}
public static boolean existsFileOnHDFS(String fname){