You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2016/01/29 20:03:52 UTC
[1/2] incubator-systemml git commit: [SYSTEMML-476] Centralize Hadoop
props in MRConfigurationNames
Repository: incubator-systemml
Updated Branches:
refs/heads/master f5df08828 -> 2bcfdc6c9
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index a4cfaa6..786b111 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.wink.json4j.JSONException;
import org.apache.wink.json4j.JSONObject;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -46,6 +45,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.CSVReblockMR;
import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.runtime.util.UtilFunctions;
@@ -103,7 +103,7 @@ public class TfUtils implements Serializable{
public static String getPartFileName(JobConf job) throws IOException {
FileSystem fs = FileSystem.get(job);
- Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+ Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
return thisPath.toString();
}
@@ -369,7 +369,7 @@ public class TfUtils implements Serializable{
FileSystem fs;
fs = FileSystem.get(_rJob);
- Path thisPath=new Path(_rJob.get("map.input.file")).makeQualified(fs);
+ Path thisPath=new Path(_rJob.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
String thisfile=thisPath.toString();
Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index be182cd..b0089ad 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -693,7 +693,7 @@ public class MapReduceTool
}
//NOTE: we depend on the configured umask, setting umask in job or fspermission has no effect
- //similarly setting dfs.datanode.data.dir.perm as no effect either.
+ //similarly setting MRConfigurationNames.DFS_DATANODE_DATA_DIR_PERM as no effect either.
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/yarn/DMLAppMasterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/yarn/DMLAppMasterUtils.java b/src/main/java/org/apache/sysml/yarn/DMLAppMasterUtils.java
index 4983c7f..084de46 100644
--- a/src/main/java/org/apache/sysml/yarn/DMLAppMasterUtils.java
+++ b/src/main/java/org/apache/sysml/yarn/DMLAppMasterUtils.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.hops.HopsException;
@@ -38,6 +37,7 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.ProgramBlock;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.yarn.ropt.ResourceConfig;
import org.apache.sysml.yarn.ropt.ResourceOptimizer;
import org.apache.sysml.yarn.ropt.YarnClusterConfig;
@@ -171,12 +171,12 @@ public class DMLAppMasterUtils
String memOpts = "-Xmx"+memMB+"m -Xms"+memMB+"m -Xmn"+(int)(memMB/10)+"m";
//set mapper heapsizes
- job.set( "mapreduce.map.java.opts", memOpts );
- job.set( "mapreduce.map.memory.mb", String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
+ job.set( MRConfigurationNames.MR_MAP_JAVA_OPTS, memOpts );
+ job.set( MRConfigurationNames.MR_MAP_MEMORY_MB, String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
//set reducer heapsizes
- job.set( "mapreduce.reduce.java.opts", memOpts );
- job.set( "mapreduce.reduce.memory.mb", String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
+ job.set( MRConfigurationNames.MR_REDUCE_JAVA_OPTS, memOpts );
+ job.set( MRConfigurationNames.MR_REDUCE_MEMORY_MB, String.valueOf(DMLYarnClient.computeMemoryAllocation(memMB)) );
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
index 191401c..6793ead 100644
--- a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
+++ b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
@@ -509,7 +509,7 @@ public class DMLYarnClient
//setup mapreduce appmaster environment (for robustness if not included in default environment)
//for example, by default HDP 2.2 did not include mapred client libraries in this configuration
- //note: we cannot use mapreduce.application.classpath because it refers to HDFS and $PWD that needs to be setup
+ //note: we cannot use MRConfigurationNames.MR_APPLICATION_CLASSPATH because it refers to HDFS and $PWD that needs to be setup
Map<String, String> env = System.getenv();
String mapred_home = null;
//get mapred home via alternative environment variables
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java b/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java
index 18a2dfc..42fb6f5 100644
--- a/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java
+++ b/src/main/java/org/apache/sysml/yarn/ropt/YarnClusterAnalyzer.java
@@ -680,9 +680,9 @@ public class YarnClusterAnalyzer
//handle jvm max mem (map mem budget is relevant for map-side distcache and parfor)
//(for robustness we probe both: child and map configuration parameters)
- String javaOpts1 = conf.get("mapred.child.java.opts"); //internally mapred/mapreduce synonym
- String javaOpts2 = conf.get("mapreduce.map.java.opts", null); //internally mapred/mapreduce synonym
- String javaOpts3 = conf.get("mapreduce.reduce.java.opts", null); //internally mapred/mapreduce synonym
+ String javaOpts1 = conf.get(MRConfigurationNames.MR_CHILD_JAVA_OPTS); //internally mapred/mapreduce synonym
+ String javaOpts2 = conf.get(MRConfigurationNames.MR_MAP_JAVA_OPTS, null); //internally mapred/mapreduce synonym
+ String javaOpts3 = conf.get(MRConfigurationNames.MR_REDUCE_JAVA_OPTS, null); //internally mapred/mapreduce synonym
if( javaOpts2 != null ) //specific value overrides generic
_remoteJVMMaxMemMap = extractMaxMemoryOpt(javaOpts2);
else
@@ -700,7 +700,7 @@ public class YarnClusterAnalyzer
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
maximumPhyAllocate = (long) 1024 * 1024 * conf.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- mrAMPhy = (long)conf.getInt("yarn.app.mapreduce.am.resource.mb", 1536) * 1024 * 1024;
+ mrAMPhy = (long)conf.getInt(MRConfigurationNames.YARN_APP_MR_AM_RESOURCE_MB, 1536) * 1024 * 1024;
} catch (Exception e) {
throw new RuntimeException("Unable to analyze yarn cluster ", e);
[2/2] incubator-systemml git commit: [SYSTEMML-476] Centralize Hadoop
props in MRConfigurationNames
Posted by de...@apache.org.
[SYSTEMML-476] Centralize Hadoop props in MRConfigurationNames
Closes #57.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2bcfdc6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2bcfdc6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2bcfdc6c
Branch: refs/heads/master
Commit: 2bcfdc6c9fdadcf5596b1d3cc25cf0e8dbb0ecc3
Parents: f5df088
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Fri Jan 29 10:53:47 2016 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Fri Jan 29 10:53:47 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/api/DMLScript.java | 6 +-
.../parfor/DataPartitionerRemoteMR.java | 12 +--
.../controlprogram/parfor/RemoteDPParForMR.java | 10 +-
.../parfor/RemoteDPParWorkerReducer.java | 4 +-
.../controlprogram/parfor/RemoteParForMR.java | 8 +-
.../parfor/RemoteParWorkerMapper.java | 8 +-
.../parfor/ResultMergeRemoteMR.java | 12 +--
.../parfor/ResultMergeRemoteMapper.java | 4 +-
.../parfor/stat/InfrastructureAnalyzer.java | 10 +-
.../runtime/io/BinaryBlockSerialization.java | 2 +-
.../sysml/runtime/io/WriterBinaryBlock.java | 14 +--
.../runtime/io/WriterBinaryBlockParallel.java | 8 +-
.../apache/sysml/runtime/matrix/CMCOVMR.java | 4 +-
.../sysml/runtime/matrix/CSVReblockMR.java | 6 +-
.../apache/sysml/runtime/matrix/CleanupMR.java | 4 +-
.../apache/sysml/runtime/matrix/CombineMR.java | 6 +-
.../apache/sysml/runtime/matrix/DataGenMR.java | 10 +-
.../org/apache/sysml/runtime/matrix/GMR.java | 6 +-
.../sysml/runtime/matrix/GroupedAggMR.java | 5 +-
.../org/apache/sysml/runtime/matrix/MMCJMR.java | 6 +-
.../org/apache/sysml/runtime/matrix/MMRJMR.java | 6 +-
.../apache/sysml/runtime/matrix/ReblockMR.java | 12 +--
.../org/apache/sysml/runtime/matrix/SortMR.java | 10 +-
.../apache/sysml/runtime/matrix/WriteCSVMR.java | 6 +-
.../matrix/data/UnPaddedOutputFormat.java | 3 +-
.../data/hadoopfix/DelegatingInputFormat.java | 7 +-
.../matrix/data/hadoopfix/MultipleInputs.java | 15 +--
.../matrix/mapred/CSVAssignRowIDMapper.java | 3 +-
.../runtime/matrix/mapred/CSVReblockMapper.java | 3 +-
.../matrix/mapred/MRConfigurationNames.java | 98 +++++++++++++++++---
.../matrix/mapred/MRJobConfiguration.java | 23 +++--
.../sysml/runtime/transform/ApplyTfBBMR.java | 4 +-
.../runtime/transform/ApplyTfBBMapper.java | 4 +-
.../sysml/runtime/transform/ApplyTfCSVMR.java | 4 +-
.../sysml/runtime/transform/GenTfMtdMR.java | 4 +-
.../apache/sysml/runtime/transform/TfUtils.java | 6 +-
.../sysml/runtime/util/MapReduceTool.java | 2 +-
.../apache/sysml/yarn/DMLAppMasterUtils.java | 10 +-
.../org/apache/sysml/yarn/DMLYarnClient.java | 2 +-
.../sysml/yarn/ropt/YarnClusterAnalyzer.java | 8 +-
40 files changed, 226 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/api/DMLScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java
index ce37bea..101b229 100644
--- a/src/main/java/org/apache/sysml/api/DMLScript.java
+++ b/src/main/java/org/apache/sysml/api/DMLScript.java
@@ -835,7 +835,7 @@ public class DMLScript
JobConf job = ConfigurationManager.getCachedJobConf();
boolean localMode = InfrastructureAnalyzer.isLocalMode(job);
String taskController = job.get(MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER, "org.apache.hadoop.mapred.DefaultTaskController");
- String ttGroupName = job.get("mapreduce.tasktracker.group","null");
+ String ttGroupName = job.get(MRConfigurationNames.MR_TASKTRACKER_GROUP,"null");
String perm = job.get(MRConfigurationNames.DFS_PERMISSIONS_ENABLED,"null"); //note: job.get("dfs.permissions.supergroup",null);
URI fsURI = FileSystem.getDefaultUri(job);
@@ -851,8 +851,8 @@ public class DMLScript
+ "local.user.groups = " + ProgramConverter.serializeStringCollection(groupNames) + ", "
+ MRConfigurationNames.MR_JOBTRACKER_ADDRESS + " = " + job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS) + ", "
+ MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER + " = " + taskController + ","
- + "mapreduce.tasktracker.group = " + ttGroupName + ", "
- + "fs.default.name = " + ((fsURI!=null) ? fsURI.getScheme() : "null") + ", "
+ + MRConfigurationNames.MR_TASKTRACKER_GROUP + " = " + ttGroupName + ", "
+ + MRConfigurationNames.FS_DEFAULTFS + " = " + ((fsURI!=null) ? fsURI.getScheme() : "null") + ", "
+ MRConfigurationNames.DFS_PERMISSIONS_ENABLED + " = " + perm );
//print warning if permission issues possible
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index 3093377..5a679a1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
@@ -36,6 +35,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;
@@ -165,7 +165,7 @@ public class DataPartitionerRemoteMR extends DataPartitioner
}*/
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
//set up preferred custom serialization framework for binary block format
@@ -177,11 +177,11 @@ public class DataPartitionerRemoteMR extends DataPartitioner
job.setNumTasksToExecutePerJvm(-1); //unlimited
//enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
- //job.set("mapred.compress.map.output", "true");
- //job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+ //job.set(MRConfigurationNames.MR_MAP_OUTPUT_COMPRESS, "true");
+ //job.set(MRConfigurationNames.MR_MAP_OUTPUT_COMPRESS_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
//set the replication factor for the results
- job.setInt("dfs.replication", _replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, _replication);
//set up map/reduce memory configurations (if in AM context)
DMLConfig config = ConfigurationManager.getConfig();
@@ -190,7 +190,7 @@ public class DataPartitionerRemoteMR extends DataPartitioner
//set the max number of retries per map task
// disabled job-level configuration to respect cluster configuration
// note: this refers to hadoop2, hence it never had effect on mr1
- //job.setInt("mapreduce.map.maxattempts", _max_retry);
+ //job.setInt(MRConfigurationNames.MR_MAP_MAXATTEMPTS, _max_retry);
//set unique working dir
MRJobConfiguration.setUniqueWorkingDir(job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 77d5282..4910023 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
@@ -55,6 +54,7 @@ import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;
@@ -149,10 +149,10 @@ public class RemoteDPParForMR
//set optimization parameters
//set the number of mappers and reducers
- job.setNumReduceTasks( numReducers );
+ job.setNumReduceTasks( numReducers );
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
//set up preferred custom serialization framework for binary block format
@@ -167,11 +167,11 @@ public class RemoteDPParForMR
job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited
//set the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set the max number of retries per map task
//note: currently disabled to use cluster config
- //job.setInt("mapreduce.map.maxattempts", max_retry);
+ //job.setInt(MRConfigurationNames.MR_MAP_MAXATTEMPTS, max_retry);
//set unique working dir
MRJobConfiguration.setUniqueWorkingDir(job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index cb9fd06..2bcc1ad 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -46,6 +45,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.instructions.cp.IntObject;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
import org.apache.sysml.utils.Statistics;
@@ -151,7 +151,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
_partition = new MatrixBlock((int)_rlen, _clen, false);
//Step 1: configure parworker
- String taskID = job.get("mapred.tip.id");
+ String taskID = job.get(MRConfigurationNames.MR_TASK_ID);
LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
try
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index a17acb8..589157c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -162,7 +162,7 @@ public class RemoteParForMR
}
//set jvm memory size (if require)
- String memKey = "mapred.child.java.opts";
+ String memKey = MRConfigurationNames.MR_CHILD_JAVA_OPTS;
if( minMem > 0 && minMem > InfrastructureAnalyzer.extractMaxMemoryOpt(job.get(memKey)) )
{
InfrastructureAnalyzer.setMaxMemoryOpt(job, memKey, minMem);
@@ -170,7 +170,7 @@ public class RemoteParForMR
}
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
//set up map/reduce memory configurations (if in AM context)
@@ -185,12 +185,12 @@ public class RemoteParForMR
job.setInt(MRConfigurationNames.MR_TASK_IO_SORT_MB, 8); //8MB
//set the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set the max number of retries per map task
// disabled job-level configuration to respect cluster configuration
// note: this refers to hadoop2, hence it never had effect on mr1
- //job.setInt("mapreduce.map.maxattempts", max_retry);
+ //job.setInt(MRConfigurationNames.MR_MAP_MAXATTEMPTS, max_retry);
//set unique working dir
MRJobConfiguration.setUniqueWorkingDir(job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
index fb259c4..7a48974 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
@@ -41,6 +40,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
import org.apache.sysml.utils.Statistics;
@@ -123,7 +123,7 @@ public class RemoteParWorkerMapper extends ParWorker //MapReduceBase not requir
public void configure(JobConf job)
{
boolean requiresConfigure = true;
- String jobID = job.get("mapred.job.id");
+ String jobID = job.get(MRConfigurationNames.MR_JOB_ID);
//probe cache for existing worker (parfor body, symbol table, etc)
if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
@@ -153,11 +153,11 @@ public class RemoteParWorkerMapper extends ParWorker //MapReduceBase not requir
if( requiresConfigure )
{
- LOG.trace("configure RemoteParWorkerMapper "+job.get("mapred.tip.id"));
+ LOG.trace("configure RemoteParWorkerMapper "+job.get(MRConfigurationNames.MR_TASK_ID));
try
{
- _stringID = job.get("mapred.tip.id"); //task ID
+ _stringID = job.get(MRConfigurationNames.MR_TASK_ID); //task ID
_workerID = IDHandler.extractIntID(_stringID); //int task ID
//use the given job configuration as source for all new job confs
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 20aea33..ede1849 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
@@ -48,6 +47,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixBlock;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixCell;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -293,7 +293,7 @@ public class ResultMergeRemoteMR extends ResultMerge
}
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
//set up preferred custom serialization framework for binary block format
@@ -305,16 +305,16 @@ public class ResultMergeRemoteMR extends ResultMerge
job.setNumTasksToExecutePerJvm(-1); //unlimited
//enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
- //job.set("mapred.compress.map.output", "true");
- //job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+ //job.set(MRConfigurationNames.MR_MAP_OUTPUT_COMPRESS, "true");
+ //job.set(MRConfigurationNames.MR_MAP_OUTPUT_COMPRESS_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
//set the replication factor for the results
- job.setInt("dfs.replication", _replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, _replication);
//set the max number of retries per map task
// disabled job-level configuration to respect cluster configuration
// note: this refers to hadoop2, hence it never had effect on mr1
- //job.setInt("mapreduce.map.maxattempts", _max_retry);
+ //job.setInt(MRConfigurationNames.MR_MAP_MAXATTEMPTS, _max_retry);
//set unique working dir
MRJobConfiguration.setUniqueWorkingDir(job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
index 48306a5..c8f579a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
@@ -26,13 +26,13 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixBlock;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixCell;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.FastStringTokenizer;
import org.apache.sysml.runtime.util.UtilFunctions;
@@ -60,7 +60,7 @@ public class ResultMergeRemoteMapper
InputInfo ii = MRJobConfiguration.getResultMergeInputInfo(job);
long[] tmp = MRJobConfiguration.getResultMergeMatrixCharacteristics( job );
String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job);
- String currentFname = job.get("map.input.file");
+ String currentFname = job.get(MRConfigurationNames.MR_MAP_INPUT_FILE);
byte tag = 0;
//startsWith comparison in order to account for part names in currentFname
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index 23fd697..3d595b8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -266,7 +266,7 @@ public class InfrastructureAnalyzer
// we explicitly probe the relevant properties instead of relying on results from
// analyzeHadoopCluster().
String jobTracker = job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS, "local");
- String framework = job.get("mapreduce.framework.name", "local");
+ String framework = job.get(MRConfigurationNames.MR_FRAMEWORK_NAME, "local");
boolean isYarnEnabled = (framework!=null && framework.equals("yarn"));
return ("local".equals(jobTracker) & !isYarnEnabled);
@@ -513,9 +513,9 @@ public class InfrastructureAnalyzer
//handle jvm max mem (map mem budget is relevant for map-side distcache and parfor)
//(for robustness we probe both: child and map configuration parameters)
- String javaOpts1 = job.get("mapred.child.java.opts"); //internally mapred/mapreduce synonym
- String javaOpts2 = job.get("mapreduce.map.java.opts", null); //internally mapred/mapreduce synonym
- String javaOpts3 = job.get("mapreduce.reduce.java.opts", null); //internally mapred/mapreduce synonym
+ String javaOpts1 = job.get(MRConfigurationNames.MR_CHILD_JAVA_OPTS); //internally mapred/mapreduce synonym
+ String javaOpts2 = job.get(MRConfigurationNames.MR_MAP_JAVA_OPTS, null); //internally mapred/mapreduce synonym
+ String javaOpts3 = job.get(MRConfigurationNames.MR_REDUCE_JAVA_OPTS, null); //internally mapred/mapreduce synonym
if( javaOpts2 != null ) //specific value overrides generic
_remoteJVMMaxMemMap = extractMaxMemoryOpt(javaOpts2);
else
@@ -530,7 +530,7 @@ public class InfrastructureAnalyzer
_blocksize = Long.parseLong(blocksize);
//is yarn enabled
- String framework = job.get("mapreduce.framework.name");
+ String framework = job.get(MRConfigurationNames.MR_FRAMEWORK_NAME);
_yarnEnabled = (framework!=null && framework.equals("yarn"));
//analyze if local mode (internally requires yarn_enabled)
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java b/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java
index e09b294..92b358d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java
+++ b/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java
@@ -33,7 +33,7 @@ import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
/**
* This custom serialization class can be used via
- * job.set("io.serializations", "org.apache.sysml.runtime.io.BinaryBlockSerialization");
+ * job.set(MRConfigurationNames.IO_SERIALIZATIONS, "org.apache.sysml.runtime.io.BinaryBlockSerialization");
*
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
index c9c8a37..54b011f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
@@ -34,6 +33,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -108,12 +108,12 @@ public class WriterBinaryBlock extends MatrixWriter
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
// 1) create sequence file writer, with right replication factor
- // (config via 'dfs.replication' not possible since sequence file internally calls fs.getDefaultReplication())
+ // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
- writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt("io.file.buffer.size", 4096),
+ writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
(short)replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
@@ -202,12 +202,12 @@ public class WriterBinaryBlock extends MatrixWriter
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
// 1) create sequence file writer, with right replication factor
- // (config via 'dfs.replication' not possible since sequence file internally calls fs.getDefaultReplication())
+ // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
- writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt("io.file.buffer.size", 4096),
+ writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
(short)replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
@@ -320,7 +320,7 @@ public class WriterBinaryBlock extends MatrixWriter
for( int k = 0; k<numBlocks; k+=numPartBlocks )
{
// 1) create sequence file writer, with right replication factor
- // (config via 'dfs.replication' not possible since sequence file internally calls fs.getDefaultReplication())
+ // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
Path path2 = new Path(path.toString()+File.separator+(++count));
SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path2, MatrixIndexes.class, MatrixBlock.class);
@@ -370,7 +370,7 @@ public class WriterBinaryBlock extends MatrixWriter
for( int k = 0; k<numBlocks; k+=numPartBlocks )
{
// 1) create sequence file writer, with right replication factor
- // (config via 'dfs.replication' not possible since sequence file internally calls fs.getDefaultReplication())
+ // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
Path path2 = new Path(path.toString()+File.separator+(++count));
SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path2, MatrixIndexes.class, MatrixBlock.class);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
index 83d6968..a950c88 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -39,6 +38,7 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -146,13 +146,13 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock
public Object call() throws Exception
{
// 1) create sequence file writer, with right replication factor
- // (config via 'dfs.replication' not possible since sequence file internally calls fs.getDefaultReplication())
+ // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
- writer = new SequenceFile.Writer(_fs, _job, _path, MatrixIndexes.class, MatrixBlock.class, _job.getInt("io.file.buffer.size", 4096),
- (short)_replication, _fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
+ writer = new SequenceFile.Writer(_fs, _job, _path, MatrixIndexes.class, MatrixBlock.class, _job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
+ (short)_replication, _fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
index 744ce88..86249c7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.matrix.data.CM_N_COVCell;
import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -35,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.CMCOVMRMapper;
import org.apache.sysml.runtime.matrix.mapred.CMCOVMRReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
@@ -85,7 +85,7 @@ public class CMCOVMR
MRJobConfiguration.setCM_N_COMInstructions(job, cmNcomInstructions);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up what matrices are needed to pass from the mapper to reducer
HashSet<Byte> mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes, instructionsInMapper, null,
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
index e3f9df4..fc47ce3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
@@ -52,6 +51,7 @@ import org.apache.sysml.runtime.matrix.mapred.CSVAssignRowIDMapper;
import org.apache.sysml.runtime.matrix.mapred.CSVAssignRowIDReducer;
import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
@@ -313,7 +313,7 @@ public class CSVReblockMR
MRJobConfiguration.setCSVReblockInstructions(job, reblockInstructions);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up the number of reducers
job.setNumReduceTasks(1);
@@ -401,7 +401,7 @@ public class CSVReblockMR
MRJobConfiguration.setInstructionsInReducer(job, otherInstructionsInReducer);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up preferred custom serialization framework for binary block format
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
index ee0fd2e..8e743a7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
@@ -40,11 +40,11 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.lib.NLineInputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
@@ -88,7 +88,7 @@ public class CleanupMR
writeCleanupTasksToFile(path, numNodes);
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
/////
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
index c660cf1..5940951 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
-
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.instructions.mr.CombineBinaryInstruction;
import org.apache.sysml.runtime.instructions.mr.CombineTernaryInstruction;
@@ -49,10 +48,11 @@ import org.apache.sysml.runtime.matrix.data.TaggedMatrixValue;
import org.apache.sysml.runtime.matrix.data.WeightedPair;
import org.apache.sysml.runtime.matrix.mapred.GMRMapper;
import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.matrix.mapred.ReduceBase;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
+import org.apache.sysml.runtime.matrix.mapred.ReduceBase;
import org.apache.sysml.runtime.util.UtilFunctions;
@@ -327,7 +327,7 @@ public class CombineMR
MRJobConfiguration.setCombineInstructions(job, combineInstructions);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up what matrices are needed to pass from the mapper to reducer
HashSet<Byte> mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, inputIndexes, null, null, combineInstructions,
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index 0d34d0b..7d01d30 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.lops.Lop;
@@ -44,18 +43,19 @@ import org.apache.sysml.runtime.instructions.MRInstructionParser;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.instructions.mr.DataGenMRInstruction;
import org.apache.sysml.runtime.instructions.mr.MRInstruction;
+import org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE;
import org.apache.sysml.runtime.instructions.mr.RandInstruction;
import org.apache.sysml.runtime.instructions.mr.SeqInstruction;
-import org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixBlock;
+import org.apache.sysml.runtime.matrix.mapred.DataGenMapper;
import org.apache.sysml.runtime.matrix.mapred.GMRCombiner;
import org.apache.sysml.runtime.matrix.mapred.GMRReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.matrix.mapred.DataGenMapper;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -294,7 +294,7 @@ public class DataGenMR
MRJobConfiguration.setInstructionsInReducer(job, otherInstructionsInReducer);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up map/reduce memory configurations (if in AM context)
DMLConfig config = ConfigurationManager.getConfig();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/GMR.java b/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
index 43d59d5..524ab6c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
@@ -27,11 +27,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.lops.Lop;
@@ -57,6 +56,7 @@ import org.apache.sysml.runtime.matrix.mapred.GMRCombiner;
import org.apache.sysml.runtime.matrix.mapred.GMRMapper;
import org.apache.sysml.runtime.matrix.mapred.GMRReducer;
import org.apache.sysml.runtime.matrix.mapred.MRBaseForCommonInstructions;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
@@ -205,7 +205,7 @@ public class GMR
MRJobConfiguration.setInstructionsInReducer(job, otherInstructionsInReducer);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up preferred custom serialization framework for binary block format
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java b/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
index 7a253f4..587c8a4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
@@ -22,10 +22,10 @@ package org.apache.sysml.runtime.matrix;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.WeightedCell;
import org.apache.sysml.runtime.matrix.mapred.GroupedAggMRCombiner;
import org.apache.sysml.runtime.matrix.mapred.GroupedAggMRMapper;
import org.apache.sysml.runtime.matrix.mapred.GroupedAggMRReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -90,7 +91,7 @@ public class GroupedAggMR
MRJobConfiguration.setNumReducers(job, numReducers, numReducers);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up what matrices are needed to pass from the mapper to reducer
MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes, null, null,
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java b/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
index ce3602f..39195f8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
@@ -24,11 +24,10 @@ import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -42,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.MMCJMRMapper;
import org.apache.sysml.runtime.matrix.mapred.MMCJMRReducerWithAggregator;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
@@ -203,7 +203,7 @@ public class MMCJMR
MRJobConfiguration.setAggregateBinaryInstructions(job, aggBinInstrction);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up preferred custom serialization framework for binary block format
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java b/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
index c942ed6..3832d21 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
@@ -23,11 +23,10 @@ import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
@@ -38,6 +37,7 @@ import org.apache.sysml.runtime.matrix.data.TaggedMatrixCell;
import org.apache.sysml.runtime.matrix.data.TripleIndexes;
import org.apache.sysml.runtime.matrix.mapred.MMRJMRMapper;
import org.apache.sysml.runtime.matrix.mapred.MMRJMRReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
@@ -116,7 +116,7 @@ public class MMRJMR
MRJobConfiguration.setInstructionsInReducer(job, otherInstructionsInReducer);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up map/reduce memory configurations (if in AM context)
DMLConfig config = ConfigurationManager.getConfig();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java b/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
index 9ce3ff1..d28d7e4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
@@ -24,11 +24,10 @@ import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -38,10 +37,11 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedAdaptivePartialBlock;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
import org.apache.sysml.runtime.matrix.mapred.ReblockMapper;
import org.apache.sysml.runtime.matrix.mapred.ReblockReducer;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer;
@@ -103,10 +103,10 @@ public class ReblockMR
MRJobConfiguration.setInstructionsInReducer(job, otherInstructionsInReducer);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
+ job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
job.setMapSpeculativeExecution(false);
//set up preferred custom serialization framework for binary block format
@@ -132,7 +132,7 @@ public class ReblockMR
job.setNumReduceTasks(numRed);
//setup in-memory reduce buffers budget (reblock reducer dont need much memory)
- //job.set("mapred.job.reduce.input.buffer.percent", "0.70");
+ //job.set(MRConfigurationNames.MR_REDUCE_INPUT_BUFFER_PERCENT, "0.70");
// Print the complete instruction
if (LOG.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 29512c9..218a95b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -36,14 +36,13 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.SortKeys;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -57,6 +56,7 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.sort.CompactInputFormat;
@@ -65,9 +65,9 @@ import org.apache.sysml.runtime.matrix.sort.IndexSortComparable;
import org.apache.sysml.runtime.matrix.sort.IndexSortComparableDesc;
import org.apache.sysml.runtime.matrix.sort.IndexSortMapper;
import org.apache.sysml.runtime.matrix.sort.IndexSortReducer;
+import org.apache.sysml.runtime.matrix.sort.IndexSortStitchupMapper;
import org.apache.sysml.runtime.matrix.sort.IndexSortStitchupReducer;
import org.apache.sysml.runtime.matrix.sort.SamplingSortMRInputFormat;
-import org.apache.sysml.runtime.matrix.sort.IndexSortStitchupMapper;
import org.apache.sysml.runtime.matrix.sort.ValueSortMapper;
import org.apache.sysml.runtime.matrix.sort.ValueSortReducer;
import org.apache.sysml.runtime.util.MapReduceTool;
@@ -245,7 +245,7 @@ public class SortMR
DistributedCache.createSymlink(job);
//setup replication factor
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
MatrixCharacteristics[] s = new MatrixCharacteristics[1];
s[0] = new MatrixCharacteristics(rlen, clen, brlen, bclen);
@@ -403,7 +403,7 @@ public class SortMR
job.set(SORT_INDEXES_OFFSETS, Arrays.toString(cumsumCounts));
//setup replication factor
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set unique working dir
MRJobConfiguration.setUniqueWorkingDir(job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
index 9d36bae..632db0c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
@@ -23,11 +23,10 @@ import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Group;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -40,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.CSVWriteMapper;
import org.apache.sysml.runtime.matrix.mapred.CSVWriteReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer;
@@ -77,7 +77,7 @@ public class WriteCSVMR
MRJobConfiguration.setCSVWriteInstructions(job, csvWriteInstructions);
//set up the replication factor for the results
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up preferred custom serialization framework for binary block format
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java
index 9051a5b..3c4e59a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
public class UnPaddedOutputFormat<K extends Writable, V extends Writable> extends FileOutputFormat<K, V>
{
@@ -60,7 +61,7 @@ public class UnPaddedOutputFormat<K extends Writable, V extends Writable> extend
String name, Progressable progress) throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
- FSDataOutputStream fileOut = fs.create(file, true, job.getInt("io.file.buffer.size", 4096), progress);
+ FSDataOutputStream fileOut = fs.create(file, true, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), progress);
return new UnpaddedRecordWriter<K, V>(fileOut);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/DelegatingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/DelegatingInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/DelegatingInputFormat.java
index 9316a0d..9268f44 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/DelegatingInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/DelegatingInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
/**
* An {@link InputFormat} that delegates behaviour of paths to multiple other
@@ -127,9 +128,9 @@ public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
InputSplit inputSplit = taggedInputSplit.getInputSplit();
if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
- conf.set("map.input.file", fileSplit.getPath().toString());
- conf.setLong("map.input.start", fileSplit.getStart());
- conf.setLong("map.input.length", fileSplit.getLength());
+ conf.set(MRConfigurationNames.MR_MAP_INPUT_FILE, fileSplit.getPath().toString());
+ conf.setLong(MRConfigurationNames.MR_MAP_INPUT_START, fileSplit.getStart());
+ conf.setLong(MRConfigurationNames.MR_MAP_INPUT_LENGTH, fileSplit.getLength());
}
return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/MultipleInputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/MultipleInputs.java b/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/MultipleInputs.java
index 5fbe178..8bc69a7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/MultipleInputs.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/hadoopfix/MultipleInputs.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.lib.DelegatingMapper;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
/**
* This class supports MapReduce jobs that have multiple input paths with
@@ -49,8 +50,8 @@ public class MultipleInputs {
String inputFormatMapping = path.toString() + ";"
+ inputFormatClass.getName();
- String inputFormats = conf.get("mapred.input.dir.formats");
- conf.set("mapred.input.dir.formats",
+ String inputFormats = conf.get(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS);
+ conf.set(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS,
inputFormats == null ? inputFormatMapping : inputFormats + ","
+ inputFormatMapping);
@@ -73,8 +74,8 @@ public class MultipleInputs {
addInputPath(conf, path, inputFormatClass);
String mapperMapping = path.toString() + ";" + mapperClass.getName();
- String mappers = conf.get("mapred.input.dir.mappers");
- conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+ String mappers = conf.get(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS);
+ conf.set(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS, mappers == null ? mapperMapping
: mappers + "," + mapperMapping);
conf.setMapperClass(DelegatingMapper.class);
@@ -90,7 +91,7 @@ public class MultipleInputs {
*/
static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
- String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+ String[] pathMappings = conf.get(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS).split(",");
for (String pathMapping : pathMappings) {
String[] split = pathMapping.split(";");
InputFormat inputFormat;
@@ -115,11 +116,11 @@ public class MultipleInputs {
*/
@SuppressWarnings("unchecked")
static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
- if (conf.get("mapred.input.dir.mappers") == null) {
+ if (conf.get(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS) == null) {
return Collections.emptyMap();
}
Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
- String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+ String[] pathMappings = conf.get(MRConfigurationNames.MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS).split(",");
for (String pathMapping : pathMappings) {
String[] split = pathMapping.split(";");
Class<? extends Mapper> mapClass;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
index fd22057..06b15e0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.wink.json4j.JSONException;
-
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
@@ -102,7 +101,7 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr
thisIndex=MRJobConfiguration.getInputMatrixIndexesInMapper(job).get(0);
outKey.set(thisIndex);
FileSystem fs=FileSystem.get(job);
- Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+ Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
filename=thisPath.toString();
String[] strs=job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT);
Path headerPath=new Path(strs[thisIndex]).makeQualified(fs);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
index 81102e8..bf00997 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.CSVReblockMR;
@@ -154,7 +153,7 @@ public class CSVReblockMapper extends MapperBase implements Mapper<LongWritable,
try
{
FileSystem fs = FileSystem.get(job);
- Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+ Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
String filename=thisPath.toString();
Path headerPath=new Path(job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT)[matrixIndex]).makeQualified(fs);
if(headerPath.toString().equals(filename))
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
index cd6e781..c2996c2 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
@@ -32,16 +32,52 @@ public abstract class MRConfigurationNames {
protected static final Log LOG = LogFactory.getLog(MRConfigurationNames.class.getName());
- public static final String DFS_BLOCKSIZE;
- public static final String DFS_METRICS_SESSION_ID;
- public static final String DFS_PERMISSIONS_ENABLED;
- public static final String MR_CLUSTER_LOCAL_DIR;
- public static final String MR_JOBTRACKER_ADDRESS;
- public static final String MR_JOBTRACKER_SYSTEM_DIR;
- public static final String MR_TASK_IO_SORT_MB;
- public static final String MR_TASKTRACKER_TASKCONTROLLER;
+ // non-deprecated properties
+ public static final String DFS_DATANODE_DATA_DIR_PERM = "dfs.datanode.data.dir.perm"; // hdfs-default.xml
+ public static final String DFS_REPLICATION = "dfs.replication"; // hdfs-default.xml
+ public static final String IO_FILE_BUFFER_SIZE = "io.file.buffer.size"; // core-default.xml
+ public static final String IO_SERIALIZATIONS = "io.serializations"; // core-default.xml
+ public static final String MR_APPLICATION_CLASSPATH = "mapreduce.application.classpath"; // mapred-default.xml
+ public static final String MR_CHILD_JAVA_OPTS = "mapred.child.java.opts"; // mapred-default.xml
+ public static final String MR_FRAMEWORK_NAME = "mapreduce.framework.name"; // mapred-default.xml
+ public static final String MR_JOBTRACKER_STAGING_ROOT_DIR = "mapreduce.jobtracker.staging.root.dir"; // mapred-default.xml
+ public static final String MR_TASKTRACKER_GROUP = "mapreduce.tasktracker.group"; // mapred-default.xml
+ public static final String YARN_APP_MR_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb"; // mapred-default.xml
- // initialize to currently used cluster
+ // deprecated properties replaced by new props, new prop names used for constants
+ // see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
+ public static final String DFS_BLOCKSIZE; // hdfs-default.xml
+ // public static final String DFS_DATANODE_DATA_DIR; // hdfs-default.xml - currently not used
+ // public static final String DFS_METRICS_SESSION_ID; // N/A - currently not used
+ public static final String DFS_PERMISSIONS_ENABLED; // hdfs-default.xml
+ public static final String FS_DEFAULTFS; // core-default.xml
+ public static final String MR_CLUSTER_LOCAL_DIR; // mapred-default.xml
+ public static final String MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE; // N/A
+ public static final String MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS; // N/A
+ public static final String MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS; // N/A
+ public static final String MR_JOB_ID; // N/A
+ public static final String MR_JOBTRACKER_ADDRESS; // mapred-default.xml
+ public static final String MR_JOBTRACKER_SYSTEM_DIR; // mapred-default.xml
+ public static final String MR_MAP_INPUT_FILE; // N/A
+ public static final String MR_MAP_INPUT_LENGTH; // N/A
+ public static final String MR_MAP_INPUT_START; // N/A
+ // NOTE: mapreduce.map.java.opts commented out in mapred-default.xml so as to "not override mapred.child.java.opts"
+ public static final String MR_MAP_JAVA_OPTS;
+ public static final String MR_MAP_MAXATTEMPTS; // mapred-default.xml
+ public static final String MR_MAP_MEMORY_MB; // mapred-default.xml
+ public static final String MR_MAP_OUTPUT_COMPRESS; // N/A
+ public static final String MR_MAP_OUTPUT_COMPRESS_CODEC; // N/A
+ public static final String MR_MAP_SORT_SPILL_PERCENT; // mapred-default.xml
+ public static final String MR_REDUCE_INPUT_BUFFER_PERCENT; // N/A
+ // NOTE: mapreduce.reduce.java.opts commented out in mapred-default.xml so as to not override mapred.child.java.opts
+ public static final String MR_REDUCE_JAVA_OPTS;
+ public static final String MR_REDUCE_MEMORY_MB; // mapred-default.xml
+ public static final String MR_TASK_ID; // N/A
+ public static final String MR_TASK_IO_SORT_MB; // mapred-default.xml
+ public static final String MR_TASK_TIMEOUT; // N/A
+ public static final String MR_TASKTRACKER_TASKCONTROLLER; // mapred-default.xml
+
+ // initialize constants based on hadoop version
static {
// determine hadoop version
String version = VersionInfo.getBuildVersion();
@@ -51,22 +87,62 @@ public abstract class MRConfigurationNames {
if (hadoopVersion2) {
LOG.debug("Using hadoop 2.x configuration properties.");
DFS_BLOCKSIZE = "dfs.blocksize";
- DFS_METRICS_SESSION_ID = "dfs.metrics.session-id";
+ // DFS_DATANODE_DATA_DIR = "dfs.datanode.data.dir";
+ // DFS_METRICS_SESSION_ID = "dfs.metrics.session-id";
DFS_PERMISSIONS_ENABLED = "dfs.permissions.enabled";
+ FS_DEFAULTFS = "fs.defaultFS";
MR_CLUSTER_LOCAL_DIR = "mapreduce.cluster.local.dir";
+ MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
+ MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapreduce.input.multipleinputs.dir.formats";
+ MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS = "mapreduce.input.multipleinputs.dir.mappers";
+ MR_JOB_ID = "mapreduce.job.id";
MR_JOBTRACKER_ADDRESS = "mapreduce.jobtracker.address";
MR_JOBTRACKER_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
+ MR_MAP_INPUT_FILE = "mapreduce.map.input.file";
+ MR_MAP_INPUT_LENGTH = "mapreduce.map.input.length";
+ MR_MAP_INPUT_START = "mapreduce.map.input.start";
+ MR_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+ MR_MAP_MAXATTEMPTS = "mapreduce.map.maxattempts";
+ MR_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+ MR_MAP_OUTPUT_COMPRESS = "mapreduce.map.output.compress";
+ MR_MAP_OUTPUT_COMPRESS_CODEC = "mapreduce.map.output.compress.codec";
+ MR_MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";
+ MR_REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
+ MR_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
+ MR_REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+ MR_TASK_ID = "mapreduce.task.id";
MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb";
+ MR_TASK_TIMEOUT = "mapreduce.task.timeout";
MR_TASKTRACKER_TASKCONTROLLER = "mapreduce.tasktracker.taskcontroller";
} else { // any older version
LOG.debug("Using hadoop 1.x configuration properties.");
DFS_BLOCKSIZE = "dfs.block.size";
- DFS_METRICS_SESSION_ID = "session.id";
+ // DFS_DATANODE_DATA_DIR = "dfs.data.dir";
+ // DFS_METRICS_SESSION_ID = "session.id";
DFS_PERMISSIONS_ENABLED = "dfs.permissions";
+ FS_DEFAULTFS = "fs.default.name";
MR_CLUSTER_LOCAL_DIR = "mapred.local.dir";
+ MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapred.max.split.size";
+ MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapred.input.dir.formats";
+ MR_INPUT_MULTIPLEINPUTS_DIR_MAPPERS = "mapred.input.dir.mappers";
+ MR_JOB_ID = "mapred.job.id";
MR_JOBTRACKER_ADDRESS = "mapred.job.tracker";
MR_JOBTRACKER_SYSTEM_DIR = "mapred.system.dir";
+ MR_MAP_INPUT_FILE = "map.input.file";
+ MR_MAP_INPUT_LENGTH = "map.input.length";
+ MR_MAP_INPUT_START = "map.input.start";
+ MR_MAP_JAVA_OPTS = "mapred.map.child.java.opts";
+ MR_MAP_MAXATTEMPTS = "mapred.map.max.attempts";
+ MR_MAP_MEMORY_MB = "mapred.job.map.memory.mb";
+ MR_MAP_OUTPUT_COMPRESS = "mapred.compress.map.output";
+ MR_MAP_OUTPUT_COMPRESS_CODEC = "mapred.map.output.compression.codec";
+ MR_MAP_SORT_SPILL_PERCENT = "io.sort.spill.percent";
+ MR_REDUCE_INPUT_BUFFER_PERCENT = "mapred.job.reduce.input.buffer.percent";
+ MR_REDUCE_JAVA_OPTS = "mapred.reduce.child.java.opts";
+ MR_REDUCE_MEMORY_MB = "mapred.job.reduce.memory.mb";
+ MR_TASK_ID = "mapred.tip.id";
MR_TASK_IO_SORT_MB = "io.sort.mb";
+ MR_TASK_TIMEOUT = "mapred.task.timeout";
MR_TASKTRACKER_TASKCONTROLLER = "mapred.task.tracker.task-controller";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 6d30b5e..a4de41a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.CombineSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
@@ -273,12 +272,12 @@ public class MRJobConfiguration
public static final int getMiscMemRequired(JobConf job)
{
- return job.getInt("io.file.buffer.size", 4096);
+ return job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096);
}
public static final int getJVMMaxMemSize(JobConf job)
{
- String str=job.get("mapred.child.java.opts");
+ String str=job.get(MRConfigurationNames.MR_CHILD_JAVA_OPTS);
int start=str.indexOf("-Xmx");
if(start<0)
return 209715200; //default 200MB
@@ -438,7 +437,7 @@ public class MRJobConfiguration
job.set(MRConfigurationNames.MR_JOBTRACKER_SYSTEM_DIR, job.get(MRConfigurationNames.MR_JOBTRACKER_SYSTEM_DIR) + uniqueSubdir);
//unique staging dir
- job.set( "mapreduce.jobtracker.staging.root.dir", job.get("mapreduce.jobtracker.staging.root.dir") + uniqueSubdir );
+ job.set( MRConfigurationNames.MR_JOBTRACKER_STAGING_ROOT_DIR, job.get(MRConfigurationNames.MR_JOBTRACKER_STAGING_ROOT_DIR) + uniqueSubdir );
}
}
@@ -454,7 +453,7 @@ public class MRJobConfiguration
public static String getStagingWorkingDirPrefix(JobConf job)
{
- return job.get("mapreduce.jobtracker.staging.root.dir");
+ return job.get(MRConfigurationNames.MR_JOBTRACKER_STAGING_ROOT_DIR);
}
/**
@@ -465,7 +464,7 @@ public class MRJobConfiguration
{
String dir = DMLConfig.LOCAL_MR_MODE_STAGING_DIR +
Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID() + Lop.FILE_SEPARATOR;
- job.set( "mapreduce.jobtracker.staging.root.dir", dir );
+ job.set( MRConfigurationNames.MR_JOBTRACKER_STAGING_ROOT_DIR, dir );
}
public static void setInputInfo(JobConf job, byte input, InputInfo inputinfo,
@@ -773,7 +772,7 @@ public class MRJobConfiguration
public static String getResultMergeStagingDir( JobConf job )
{
- return job.get(RESULTMERGE_STAGING_DIR_CONFIG) + job.get("mapred.tip.id");
+ return job.get(RESULTMERGE_STAGING_DIR_CONFIG) + job.get(MRConfigurationNames.MR_TASK_ID);
}
public static long[] getResultMergeMatrixCharacteristics( JobConf job )
@@ -826,7 +825,7 @@ public class MRJobConfiguration
matrices[i]=new Path(matrices[i]).toString();
FileSystem fs=FileSystem.get(job);
- Path thisFile=new Path(job.get("map.input.file")).makeQualified(fs);
+ Path thisFile=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
//Path p=new Path(thisFileName);
@@ -1192,10 +1191,10 @@ public class MRJobConfiguration
long sizeSortBuff = InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer();
long sizeHDFSBlk = InfrastructureAnalyzer.getHDFSBlockSize();
long newSplitSize = sizeHDFSBlk * 2;
- double spillPercent = job.getDouble("mapreduce.map.sort.spill.percent", 1.0);
+ double spillPercent = job.getDouble(MRConfigurationNames.MR_MAP_SORT_SPILL_PERCENT, 1.0);
int numPMap = OptimizerUtils.getNumMappers();
if( numPMap < totalInputSize/newSplitSize && sizeSortBuff*spillPercent >= newSplitSize && lpaths.size()==1 ) {
- job.setLong("mapreduce.input.fileinputformat.split.maxsize", newSplitSize);
+ job.setLong(MRConfigurationNames.MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE, newSplitSize);
combineInputFormat = true;
}
}
@@ -1919,8 +1918,8 @@ public class MRJobConfiguration
public static void addBinaryBlockSerializationFramework( Configuration job )
{
- String frameworkList = job.get("io.serializations");
+ String frameworkList = job.get(MRConfigurationNames.IO_SERIALIZATIONS);
String frameworkClassBB = BinaryBlockSerialization.class.getCanonicalName();
- job.set("io.serializations", frameworkClassBB+","+frameworkList);
+ job.set(MRConfigurationNames.IO_SERIALIZATIONS, frameworkClassBB+","+frameworkList);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
index c51652f..043c5c3 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.instructions.InstructionParser;
@@ -40,6 +39,7 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
@@ -86,7 +86,7 @@ public class ApplyTfBBMR {
//set up the instructions that will happen in the reducer, after the aggregation instrucions
MRJobConfiguration.setInstructionsInReducer(job, otherInst);
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
//set up preferred custom serialization framework for binary block format
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
index b91ab60..580cc87 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
@@ -34,13 +34,13 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.wink.json4j.JSONException;
-
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
import org.apache.sysml.runtime.matrix.CSVReblockMR;
import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
import org.apache.sysml.runtime.matrix.mapred.MapperBase;
@@ -72,7 +72,7 @@ public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable,
Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
FileSystem fs = FileSystem.get(job);
- Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+ Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
String thisfile=thisPath.toString();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
index 7c323f4..6d06ff0 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
@@ -35,11 +35,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-
import org.apache.sysml.runtime.matrix.CSVReblockMR;
import org.apache.sysml.runtime.matrix.JobReturn;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -75,7 +75,7 @@ public class ApplyTfCSVMR {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
FileInputFormat.addInputPath(job, new Path(inputPath));
// delete outputPath, if exists already.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2bcfdc6c/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
index 09b9148..6c4f237 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
@@ -33,8 +33,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
/**
@@ -67,7 +67,7 @@ public class GenTfMtdMR {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
- job.setInt("dfs.replication", replication);
+ job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
FileInputFormat.addInputPath(job, new Path(inputPath));
// delete outputPath, if exists already.