You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/01/28 01:18:17 UTC

svn commit: r1064378 - in /oodt/branches/wengine-branch/wengine: ./ src/main/java/org/apache/oodt/cas/workflow/engine/runner/

Author: bfoster
Date: Fri Jan 28 00:18:17 2011
New Revision: 1064378

URL: http://svn.apache.org/viewvc?rev=1064378&view=rev
Log:

- beginnings of pluggin in wengine into hadoop

------------------------

Added:
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java   (with props)
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java   (with props)
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java   (with props)
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java   (with props)
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java   (with props)
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java   (with props)
Modified:
    oodt/branches/wengine-branch/wengine/pom.xml

Modified: oodt/branches/wengine-branch/wengine/pom.xml
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/pom.xml?rev=1064378&r1=1064377&r2=1064378&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/pom.xml (original)
+++ oodt/branches/wengine-branch/wengine/pom.xml Fri Jan 28 00:18:17 2011
@@ -170,6 +170,11 @@
 			<version>2.6.2</version>
 		</dependency>
 		<dependency>
+    		<groupId>org.apache.hadoop</groupId>
+    		<artifactId>hadoop-mapred</artifactId>
+    		<version>0.21.1-SNAPSHOT</version>
+		</dependency>
+		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
 			<version>3.8.2</version>

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public interface HadoopMapReduceable {
+
+	public String getMapperClass();
+	
+	public String getCombinerClass();
+	
+	public String getReducerClass();
+	
+	public int getNumOfReducers();
+	
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,111 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopRunner extends EngineRunner {
+
+	public static final String DEPENDENCY_FILE_KEYS = "HadoopRunner/DependencyFileKeys";
+	public static final String INPUT_FILE_KEYS = "HadoopRunner/InputFileKeys";
+	
+	@Override
+	public void execute(TaskInstance workflowInstance) throws Exception {
+		Metadata instanceMetadata = workflowInstance.getMetadata();
+	    Configuration conf = new Configuration();
+	    Cluster cluster = new Cluster(conf);
+	    Job job = Job.getInstance(cluster);
+	    job.setJobName(workflowInstance.getModelId());
+	    job.setJarByClass(HadoopRunner.class);
+
+	    //setup input files
+	    List<String> inputFiles = this.getInputFiles(instanceMetadata);
+	    for (String inputFile : inputFiles)
+	    	FileInputFormat.addInputPath(job, new Path(inputFile));
+    	conf.set(HadoopTaskProperties.INPUT_FILES, StringUtils.join(inputFiles, ","));
+    	
+    	//setup deps files
+	    List<String> dependencyFiles = this.getDependencyFiles(instanceMetadata);
+	    for (String dependencyFile : dependencyFiles) 
+	    	job.addCacheFile(new Path(dependencyFile).toUri());
+    	conf.set(HadoopTaskProperties.DEPENDENCY_FILES, StringUtils.join(dependencyFiles, ","));
+
+		if (workflowInstance instanceof HadoopMapReduceable) {
+			job.setMapperClass((Class<? extends Mapper>) Class.forName(((HadoopMapReduceable) workflowInstance).getMapperClass()));
+			job.setCombinerClass((Class<? extends Reducer>) Class.forName(((HadoopMapReduceable) workflowInstance).getCombinerClass()));
+			job.setReducerClass((Class<? extends Reducer>) Class.forName(((HadoopMapReduceable) workflowInstance).getReducerClass()));
+			job.setNumReduceTasks(((HadoopMapReduceable) workflowInstance).getNumOfReducers());
+			job.setOutputFormatClass(HadoopTaskOutputFormat.class);
+		}else {
+			job.setMapperClass(HadoopTaskInstance.class);
+			job.setNumReduceTasks(0);
+			XStream xstream = new XStream();
+			String xstreamTask = "/temp/job-input/xstream/" + workflowInstance.getInstanceId() + "/" + workflowInstance.getModelId() + ".xstream";
+			String xmlTask = xstream.toXML(workflowInstance);
+			//write xmlTask to xstreamTask;
+	    	FileInputFormat.addInputPath(job, new Path(xstreamTask));
+		}
+		
+		job.submit();
+	}
+	
+	private List<String> getInputFiles(Metadata metadata) throws Exception {
+		Vector<String> inputFiles = new Vector<String>();
+		for (String inputFileKey : metadata.getMetadata(INPUT_FILE_KEYS).split(","))
+			inputFiles.addAll(metadata.getAllMetadata(inputFileKey));
+		return inputFiles;
+	}
+	
+	private List<String> getDependencyFiles(Metadata metadata) throws Exception {
+		Vector<String> dependencyFiles = new Vector<String>();
+		for (String dependencyFileKey : metadata.getMetadata(DEPENDENCY_FILE_KEYS).split(","))
+			dependencyFiles.addAll(metadata.getAllMetadata(dependencyFileKey));
+		return dependencyFiles;
+	}
+	
+	@Override
+	public int getOpenSlots(TaskInstance workflowInstance) throws Exception {
+		return Integer.MAX_VALUE;
+	}
+
+	@Override
+	public boolean hasOpenSlots(TaskInstance workflowInstance) throws Exception {
+		return true;
+	}
+
+//	public static class MapperTask extends Mapper<Object, Text, Text, File> {
+//		private Text outputFilesText = new Text("OutputFiles");
+//
+//		public void map(Object key, Text value, Context context)
+//				throws IOException, InterruptedException {
+//			Path[] localFiles = context.getLocalCacheFiles();
+//			File outputFile = new File("/path/to/output/file");
+//			int returnValue = ExecUtils.callProgram("/usr/bin/time -v -o log/runtime_" + value.toString() + ".txt  /path/to/exe config/${config_type}/l2_fp.config output/l2_" + value.toString() + ".h5) > log/l2_" + value.toString() + ".log.running 2>&1", (File) null);
+//			ExecUtils.callProgram("mv -f log/l2_" + value.toString() + ".log.running log/l2_" + value.toString() + ".log", (File) null);
+//			context.write(outputFilesText, outputFile);
+//		}
+//	}
+//
+//	public static class ReducerTask extends Reducer<Text, File, Text, File> {
+//		private Text outputFileText = new Text("OutputFile");
+//
+//		public void reduce(Text key, Iterable<File> values,
+//				Context context) throws IOException, InterruptedException {
+//			File outputFile = new File("/path/to/output/concat/file");
+//			context.write(outputFileText, outputFile);
+//		}
+//	}
+	
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,23 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopTaskInstance extends Mapper<Text, Text, Text, File> {
+
+	public static final String XSTREAM_TASK = "hadoop.cas.xstream.task";
+	
+	public void map(Text key, Text value, Context context)
+		throws IOException, InterruptedException {
+		XStream xstream = new XStream();
+		TaskInstance taskInstance = (TaskInstance) xstream.fromXML(context.getConfiguration().get(XSTREAM_TASK));
+		taskInstance.execute();
+	}
+	
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,117 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClient;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClientFactory;
+import org.apache.oodt.cas.workflow.server.channel.xmlrpc.XmlRpcCommunicationChannelClientFactory;
+import org.apache.oodt.cas.workflow.state.done.FailureState;
+import org.apache.oodt.cas.workflow.state.done.StoppedState;
+import org.apache.oodt.cas.workflow.state.done.SuccessState;
+import org.apache.oodt.cas.workflow.state.running.ExecutingState;
+
+public class HadoopTaskOutputCommitter extends OutputCommitter {
+
+	@Override
+	public void abortTask(TaskAttemptContext taskContext) throws IOException {
+		new File(taskContext.getWorkingDirectory().toString()).delete();
+	}
+
+	@Override
+	public void commitTask(TaskAttemptContext taskContext) throws IOException {
+		new File(taskContext.getWorkingDirectory().toString()).delete();
+	}
+
+	@Override
+	public boolean needsTaskCommit(TaskAttemptContext taskContext)
+			throws IOException {
+		return taskContext.getWorkingDirectory() != null;
+	}
+
+	@Override
+	public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+		super.abortJob(jobContext, state);
+		this.cleanupJobFiles(jobContext);
+		try {
+			WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+			if (state.equals(JobStatus.State.FAILED))
+				wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new FailureState(""));
+			else if (state.equals(JobStatus.State.KILLED))
+				wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new StoppedState(""));
+		}catch (Exception e) {
+			throw new IOException();
+		}
+	}
+	
+	@Override
+	public void commitJob(JobContext jobContext) throws IOException {
+		super.commitJob(jobContext);
+		this.cleanupJobFiles(jobContext);
+		try {
+			WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+			wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new SuccessState(""));
+		}catch (Exception e) {
+			throw new IOException();
+		}
+	}
+	
+	private void cleanupJobFiles(JobContext jobContext) throws IOException {
+		try {
+			Cluster cluster = new Cluster(jobContext.getConfiguration());
+			for (String inputFile : jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(","))
+				cluster.getFileSystem().delete(new Path(inputFile), false);
+		    for (String dependencyFile : jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(",")) 
+			    cluster.getFileSystem().delete(new Path(dependencyFile), false);
+		}catch (Exception e) {
+			throw new IOException("Failed to cleanup job files : " + e.getMessage(), e);
+		}
+	}
+	
+	@Override
+	public void setupJob(JobContext jobContext) throws IOException {
+		try {
+		    Cluster cluster = new Cluster(jobContext.getConfiguration());
+		    for (String inputFile : jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(",")) {
+		    	Path inputFilePath = new Path(inputFile);
+			    cluster.getFileSystem().copyFromLocalFile(inputFilePath, inputFilePath);
+		    }
+		    for (String dependencyFile : jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(",")) {
+		    	Path dependencyFilePath = new Path(dependencyFile);
+			    cluster.getFileSystem().copyFromLocalFile(dependencyFilePath, dependencyFilePath);
+		    }
+		    try {
+		    	WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+		    	wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new ExecutingState(""));
+		    }catch (Exception e) {}
+		}catch (Exception e) {
+			throw new IOException("Failed to setup job '" + jobContext.getJobName() + "' : " + e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public void setupTask(TaskAttemptContext taskContext) throws IOException {
+		new File(taskContext.getWorkingDirectory().toString()).mkdirs();
+	}
+	
+	private WorkflowEngineClient getWorkflowClient(JobContext jobContext) {
+	    XmlRpcCommunicationChannelClientFactory xmlrpcFactory = new XmlRpcCommunicationChannelClientFactory();
+	    xmlrpcFactory.setChunkSize(1024);
+	    xmlrpcFactory.setConnectionRetries(10);
+	    xmlrpcFactory.setConnectionRetryIntervalSecs(30);
+	    xmlrpcFactory.setConnectionTimeout(60);
+	    xmlrpcFactory.setRequestTimeout(20);
+	    xmlrpcFactory.setServerUrl(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_URL));
+	    WorkflowEngineClientFactory wengineFactory = new WorkflowEngineClientFactory();
+	    wengineFactory.setAutoPagerSize(1000);
+	    wengineFactory.setCommunicationChannelClientFactory(xmlrpcFactory);
+	    return wengineFactory.createEngine();
+	}
+
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,32 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class HadoopTaskOutputFormat extends OutputFormat<Text, Text> {
+
+	@Override
+	public void checkOutputSpecs(JobContext context) throws IOException,
+			InterruptedException {
+		// TODO Auto-generated method stub
+	}
+
+	@Override
+	public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+			throws IOException, InterruptedException {
+		return new HadoopTaskOutputCommitter();
+	}
+
+	@Override
+	public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+			throws IOException, InterruptedException {
+		return null;
+	}
+
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,14 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public class HadoopTaskProperties {
+
+	public static final String WORKFLOW_URL = "hadoop.cas.task.wm.url";
+	public static final String WORKFLOW_INSTANCE_ID = "hadoop.cas.task.wm.instance.id";
+	public static final String WORKFLOW_MODEL_ID = "hadoop.cas.task.wm.model.id";
+
+	public static final String DEPENDENCY_FILES = "hadoop.cas.task.dependency.files";
+	public static final String INPUT_FILES = "hadoop.cas.task.input.files";
+	
+	private HadoopTaskProperties() {}
+	
+}

Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain