You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/13 04:49:07 UTC

incubator-systemml git commit: [SYSTEMML-513] Fix robustness backwards compatibility Hadoop 1.x / MRv1

Repository: incubator-systemml
Updated Branches:
  refs/heads/master ff7171303 -> ce9392741


[SYSTEMML-513] Fix robustness backwards compatibility Hadoop 1.x / MRv1

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ce939274
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ce939274
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ce939274

Branch: refs/heads/master
Commit: ce9392741876708494f4657639debbe8a90430c7
Parents: ff71713
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Feb 11 20:38:17 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Feb 12 19:48:40 2016 -0800

----------------------------------------------------------------------
 .../matrix/data/MultipleOutputCommitter.java    | 40 ++++++++------------
 .../matrix/mapred/MRConfigurationNames.java     | 37 ++++++++++++------
 2 files changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ce939274/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
index 408ddc2..ac663c8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MultipleOutputCommitter.java
@@ -31,14 +31,12 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
-
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 
 public class MultipleOutputCommitter extends FileOutputCommitter 
-{
-	
-	// maintain the map of matrix index to its final output dir
+{	
+	// maintain the map of matrix index to its destination output dir
 	// private HashMap<Byte, String> outputmap=new HashMap<Byte, String>();
 	private String[] outputs;
 
@@ -83,17 +81,17 @@ public class MultipleOutputCommitter extends FileOutputCommitter
 		// get the mapping between index to output filename
 		outputs = MRJobConfiguration.getOutputs(conf);
 		
-		//get temp task output path (compatible with hadoop1 and hadoop2)
+		// get temp task output path (compatible with hadoop1 and hadoop2)
 		Path taskOutPath = FileOutputFormat.getWorkOutputPath(conf);
 		FileSystem fs = taskOutPath.getFileSystem(conf);
 		if( !fs.exists(taskOutPath) )
 			throw new IOException("Task output path "+ taskOutPath.toString() + "does not exist.");
 		
-		// Move the task outputs to their final places
+		// move the task outputs to their final places
 		context.getProgressible().progress();
 		moveFinalTaskOutputs(context, fs, taskOutPath);
 		
-		// Delete the temporary task-specific output directory
+		// delete the temporary task-specific output directory
 		if( !fs.delete(taskOutPath, true) ) 
 			LOG.debug("Failed to delete the temporary output directory of task: " + attemptId + " - " + taskOutPath);
 	}
@@ -110,8 +108,7 @@ public class MultipleOutputCommitter extends FileOutputCommitter
 	{
 		context.getProgressible().progress();
 		
-		if( fs.getFileStatus(taskOutput).isDirectory() ) 
-		{
+		if( fs.getFileStatus(taskOutput).isDirectory() ) {
 			FileStatus[] files = fs.listStatus(taskOutput);
 			if (files != null)
 				for (FileStatus file : files) //for all files
@@ -130,24 +127,19 @@ public class MultipleOutputCommitter extends FileOutputCommitter
 	private void moveFileToDestination(TaskAttemptContext context, FileSystem fs, Path file) 
 		throws IOException 
 	{
-		JobConf conf = context.getJobConf();
 		TaskAttemptID attemptId = context.getTaskAttemptID();
 		
-		//get output index and final destination
-		String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
-		String name =  file.getName(); 
-		int charIx = name.indexOf("-"+taskType+"-");
-		int index = Integer.parseInt(name.substring(0, charIx));
-		Path finalPath = new Path(outputs[index], file.getName());
+		// get output index and final destination 
+		String name =  file.getName(); //e.g., 0-r-00000 
+		int index = Integer.parseInt(name.substring(0, name.indexOf("-")));
+		Path dest = new Path(outputs[index], name); //e.g., outX/0-r-00000
 		
-		//move file from 'file' to 'finalPath'
-		if( !fs.rename(file, finalPath) ) 
-		{
-			if (!fs.delete(finalPath, true))
-				throw new IOException("Failed to delete earlier output " + finalPath + " for rename of " + file + " in task " + attemptId);
-			if (!fs.rename(file, finalPath)) 
-				throw new IOException("Failed to save output " + finalPath + " for rename of " + file + " in task: " + attemptId);
+		// move file from 'file' to 'finalPath'
+		if( !fs.rename(file, dest) ) {
+			if (!fs.delete(dest, true))
+				throw new IOException("Failed to delete earlier output " + dest + " for rename of " + file + " in task " + attemptId);
+			if (!fs.rename(file, dest)) 
+				throw new IOException("Failed to save output " + dest + " for rename of " + file + " in task: " + attemptId);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ce939274/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
index dc9343d..fb98e1c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRConfigurationNames.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.matrix.mapred;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.sysml.conf.ConfigurationManager;
 
 /**
  * This class provides a central local for used hadoop configuration properties. For portability, we support both hadoop
@@ -81,17 +82,34 @@ public abstract class MRConfigurationNames {
 	// initialize constants based on hadoop version
 	static {
 		// determine hadoop version
-		String version = VersionInfo.getBuildVersion();
-		boolean hadoopVersion2 = version.startsWith("2");
-		LOG.debug("Hadoop build version: " + version);
-
-		if (hadoopVersion2) {
+		String hversion = VersionInfo.getBuildVersion();
+		boolean hadoop2 = hversion.startsWith("2");
+		LOG.debug("Hadoop build version: " + hversion);
+		
+		// determine mapreduce version
+		String mrversion = ConfigurationManager.getCachedJobConf().get(MR_FRAMEWORK_NAME);
+		boolean mrv2 = !(mrversion == null || mrversion.equals("classic")); 
+		
+		//handle hadoop configurations
+		if( hadoop2 ) {
 			LOG.debug("Using hadoop 2.x configuration properties.");
 			DFS_BLOCKSIZE = "dfs.blocksize";
 			// DFS_DATANODE_DATA_DIR = "dfs.datanode.data.dir";
 			// DFS_METRICS_SESSION_ID = "dfs.metrics.session-id";
 			DFS_PERMISSIONS_ENABLED = "dfs.permissions.enabled";
 			FS_DEFAULTFS = "fs.defaultFS";
+		}
+		else {
+			LOG.debug("Using hadoop 1.x configuration properties.");
+			DFS_BLOCKSIZE = "dfs.block.size";
+			// DFS_DATANODE_DATA_DIR = "dfs.data.dir";
+			// DFS_METRICS_SESSION_ID = "session.id";
+			DFS_PERMISSIONS_ENABLED = "dfs.permissions";
+			FS_DEFAULTFS = "fs.default.name";			
+		}
+			
+		//handle mapreduce configurations
+		if( mrv2 ) {	
 			MR_CLUSTER_LOCAL_DIR = "mapreduce.cluster.local.dir";
 			MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
 			MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapreduce.input.multipleinputs.dir.formats";
@@ -116,13 +134,8 @@ public abstract class MRConfigurationNames {
 			MR_TASK_IO_SORT_MB = "mapreduce.task.io.sort.mb";
 			MR_TASK_TIMEOUT = "mapreduce.task.timeout";
 			MR_TASKTRACKER_TASKCONTROLLER = "mapreduce.tasktracker.taskcontroller";
-		} else { // any older version
-			LOG.debug("Using hadoop 1.x configuration properties.");
-			DFS_BLOCKSIZE = "dfs.block.size";
-			// DFS_DATANODE_DATA_DIR = "dfs.data.dir";
-			// DFS_METRICS_SESSION_ID = "session.id";
-			DFS_PERMISSIONS_ENABLED = "dfs.permissions";
-			FS_DEFAULTFS = "fs.default.name";
+		} 
+		else { // mrv1
 			MR_CLUSTER_LOCAL_DIR = "mapred.local.dir";
 			MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE = "mapred.max.split.size";
 			MR_INPUT_MULTIPLEINPUTS_DIR_FORMATS = "mapred.input.dir.formats";