You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/13 04:49:07 UTC
incubator-systemml git commit: [SYSTEMML-513] Fix robustness
backwards compatibility Hadoop 1.x / MRv1
Repository: incubator-systemml
Updated Branches:
refs/heads/master ff7171303 -> ce9392741
[SYSTEMML-513] Fix robustness backwards compatibility Hadoop 1.x / MRv1
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ce939274
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ce939274
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ce939274
Branch: refs/heads/master
Commit: ce9392741876708494f4657639debbe8a90430c7
Parents: ff71713
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Feb 11 20:38:17 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Feb 12 19:48:40 2016 -0800
----------------------------------------------------------------------
.../matrix/data/MultipleOutputCommitter.java | 40 ++++++++------------
.../matrix/mapred/MRConfigurationNames.java | 37 ++++++++++++------
2 files changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ce939274/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
index 408ddc2..ac663c8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
@@ -31,14 +31,12 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
-
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
public class MultipleOutputCommitter extends FileOutputCommitter
-{
-
- // maintain the map of matrix index to its final output dir
+{
+ // maintain the map of matrix index to its destination output dir
// private HashMap<Byte, String> outputmap=new HashMap<Byte, String>();
private String[] outputs;
@@ -83,17 +81,17 @@ public class MultipleOutputCommitter extends FileOutputCommitter
// get the mapping between index to output filename
outputs = MRJobConfiguration.getOutputs(conf);
- //get temp task output path (compatible with hadoop1 and hadoop2)
+ // get temp task output path (compatible with hadoop1 and hadoop2)
Path taskOutPath = FileOutputFormat.getWorkOutputPath(conf);
FileSystem fs = taskOutPath.getFileSystem(conf);
if( !fs.exists(taskOutPath) )
throw new IOException("Task output path "+ taskOutPath.toString() + "does not exist.");
- // Move the task outputs to their final places
+ // move the task outputs to their final places
context.getProgressible().progress();
moveFinalTaskOutputs(context, fs, taskOutPath);
- // Delete the temporary task-specific output directory
+ // delete the temporary task-specific output directory
if( !fs.delete(taskOutPath, true) )
LOG.debug("Failed to delete the temporary output directory of task: " + attemptId + " - " + taskOutPath);
}
@@ -110,8 +108,7 @@ public class MultipleOutputCommitter extends FileOutputCommitter
{
context.getProgressible().progress();
- if( fs.getFileStatus(taskOutput).isDirectory() )
- {
+ if( fs.getFileStatus(taskOutput).isDirectory() ) {
FileStatus[] files = fs.listStatus(taskOutput);
if (files != null)
for (FileStatus file : files) //for all files
@@ -130,24 +127,19 @@ public class MultipleOutputCommitter extends FileOutputCommitter
private void moveFileToDestination(TaskAttemptContext context, FileSystem fs, Path file)
throws IOException
{
- JobConf conf = context.getJobConf();
TaskAttemptID attemptId = context.getTaskAttemptID();
- //get output index and final destination
- String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
- String name = file.getName();
- int charIx = name.indexOf("-"+taskType+"-");
- int index = Integer.parseInt(name.substring(0, charIx));
- Path finalPath = new Path(outputs[index], file.getName());
+ // get output index and final destination
+ String name = file.getName(); //e.g., 0-r-00000
+ int index = Integer.parseInt(name.substring(0, name.indexOf("-")));
+ Path dest = new Path(outputs[index], name); //e.g., outX/0-r-00000
- //move file from 'file' to 'finalPath'
- if( !fs.rename(file, finalPath) )
- {
- if (!fs.delete(finalPath, true))
- throw new IOException("Failed to delete earlier output " + finalPath + " for rename of " + file + " in task " + attemptId);
- if (!fs.rename(file, finalPath))
- throw new IOException("Failed to save output " + finalPath + " for rename of " + file + " in task: " + attemptId);
+ // move file from 'file' to 'finalPath'
+ if( !fs.rename(file, dest) ) {
+ if (!fs.delete(dest, true))
+ throw new IOException("Failed to delete earlier output " + dest + " for rename of " + file + " in task " + attemptId);
+ if (!fs.rename(file, dest))
+ throw new IOException("Failed to save output " + dest + " for rename of " + file + " in task: " + attemptId);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ce939274/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
index dc9343d..fb98e1c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.matrix.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.sysml.conf.ConfigurationManager;
/**
* This class provides a central local for used hadoop configuration properties. For portability, we support both hadoop
@@ -81,17 +82,34 @@ public abstract class MRConfigurationNames {
// initialize constants based on hadoop version
static {
// determine hadoop version
- String version = VersionInfo.getBuildVersion();
- boolean hadoopVersion2 = version.startsWith("2");
- LOG.debug("Hadoop build version: " + version);
-
- if (hadoopVersion2) {
+ String hversion = VersionInfo.getBuildVersion();
+ boolean hadoop2 = hversion.startsWith("2");
+ LOG.debug("Hadoop build version: " + hversion);
+
+ // determine mapreduce version
+ String mrversion = ConfigurationManager.getCachedJobConf().get(MR_FRAMEWORK_NAME);
+ boolean mrv2 = !(mrversion == null || mrversion.equals("classic"));
+
+ //handle hadoop configurations
+ if( hadoop2 ) {
LOG.debug("Using hadoop 2.x configuration properties.");
DFS_BLOCKSIZE = "dfs.blocksize";
// DFS_DATANODE_DATA_DIR = "dfs.datanode.data.dir";
// DFS_METRICS_SESSION_ID = "dfs.metrics.session-id";
DFS_PERMISSIONS_ENABLED = "dfs.permissions.enabled";
FS_DEFAULTFS = "fs.defaultFS";
+ }
+ else {
+ LOG.debug("Using hadoop 1.x configuration properties.");
+ DFS_BLOCKSIZE = "dfs.block.size";
+ // DFS_DATANODE_DATA_DIR = "dfs.data.dir";
+ // DFS_METRICS_SESSION_ID = "session.id";
+ DFS_PERMISSIONS_ENABLED = "dfs.permissions";
+ FS_DEFAULTFS = "fs.default.name";
+ }
+
+ //handle mapreduce configurations
+ if( mrv2 ) {
MR_CLUSTER_LOCAL_DIR = "mapreduce.cluster.local.dir";
MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapreduce.input.multipleinputs.dir.formats";
@@ -116,13 +134,8 @@ public abstract class MRConfigurationNames {
MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb";
MR_TASK_TIMEOUT = "mapreduce.task.timeout";
MR_TASKTRACKER_TASKCONTROLLER = "mapreduce.tasktracker.taskcontroller";
- } else { // any older version
- LOG.debug("Using hadoop 1.x configuration properties.");
- DFS_BLOCKSIZE = "dfs.block.size";
- // DFS_DATANODE_DATA_DIR = "dfs.data.dir";
- // DFS_METRICS_SESSION_ID = "session.id";
- DFS_PERMISSIONS_ENABLED = "dfs.permissions";
- FS_DEFAULTFS = "fs.default.name";
+ }
+ else { // mrv1
MR_CLUSTER_LOCAL_DIR = "mapred.local.dir";
MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapred.max.split.size";
MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapred.input.dir.formats";