You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/01/28 01:18:17 UTC
svn commit: r1064378 - in /oodt/branches/wengine-branch/wengine: ./
src/main/java/org/apache/oodt/cas/workflow/engine/runner/
Author: bfoster
Date: Fri Jan 28 00:18:17 2011
New Revision: 1064378
URL: http://svn.apache.org/viewvc?rev=1064378&view=rev
Log:
- beginnings of pluggin in wengine into hadoop
------------------------
Added:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java (with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java (with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java (with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java (with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java (with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java (with props)
Modified:
oodt/branches/wengine-branch/wengine/pom.xml
Modified: oodt/branches/wengine-branch/wengine/pom.xml
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/pom.xml?rev=1064378&r1=1064377&r2=1064378&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/pom.xml (original)
+++ oodt/branches/wengine-branch/wengine/pom.xml Fri Jan 28 00:18:17 2011
@@ -170,6 +170,11 @@
<version>2.6.2</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapred</artifactId>
+ <version>0.21.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.2</version>
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public interface HadoopMapReduceable {
+
+ public String getMapperClass();
+
+ public String getCombinerClass();
+
+ public String getReducerClass();
+
+ public int getNumOfReducers();
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopMapReduceable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,111 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopRunner extends EngineRunner {
+
+ public static final String DEPENDENCY_FILE_KEYS = "HadoopRunner/DependencyFileKeys";
+ public static final String INPUT_FILE_KEYS = "HadoopRunner/InputFileKeys";
+
+ @Override
+ public void execute(TaskInstance workflowInstance) throws Exception {
+ Metadata instanceMetadata = workflowInstance.getMetadata();
+ Configuration conf = new Configuration();
+ Cluster cluster = new Cluster(conf);
+ Job job = Job.getInstance(cluster);
+ job.setJobName(workflowInstance.getModelId());
+ job.setJarByClass(HadoopRunner.class);
+
+ //setup input files
+ List<String> inputFiles = this.getInputFiles(instanceMetadata);
+ for (String inputFile : inputFiles)
+ FileInputFormat.addInputPath(job, new Path(inputFile));
+ conf.set(HadoopTaskProperties.INPUT_FILES, StringUtils.join(inputFiles, ","));
+
+ //setup deps files
+ List<String> dependencyFiles = this.getDependencyFiles(instanceMetadata);
+ for (String dependencyFile : dependencyFiles)
+ job.addCacheFile(new Path(dependencyFile).toUri());
+ conf.set(HadoopTaskProperties.DEPENDENCY_FILES, StringUtils.join(dependencyFiles, ","));
+
+ if (workflowInstance instanceof HadoopMapReduceable) {
+ job.setMapperClass((Class<? extends Mapper>) Class.forName(((HadoopMapReduceable) workflowInstance).getMapperClass()));
+ job.setCombinerClass((Class<? extends Reducer>) Class.forName(((HadoopMapReduceable) workflowInstance).getCombinerClass()));
+ job.setReducerClass((Class<? extends Reducer>) Class.forName(((HadoopMapReduceable) workflowInstance).getReducerClass()));
+ job.setNumReduceTasks(((HadoopMapReduceable) workflowInstance).getNumOfReducers());
+ job.setOutputFormatClass(HadoopTaskOutputFormat.class);
+ }else {
+ job.setMapperClass(HadoopTaskInstance.class);
+ job.setNumReduceTasks(0);
+ XStream xstream = new XStream();
+ String xstreamTask = "/temp/job-input/xstream/" + workflowInstance.getInstanceId() + "/" + workflowInstance.getModelId() + ".xstream";
+ String xmlTask = xstream.toXML(workflowInstance);
+ //write xmlTask to xstreamTask;
+ FileInputFormat.addInputPath(job, new Path(xstreamTask));
+ }
+
+ job.submit();
+ }
+
+ private List<String> getInputFiles(Metadata metadata) throws Exception {
+ Vector<String> inputFiles = new Vector<String>();
+ for (String inputFileKey : metadata.getMetadata(INPUT_FILE_KEYS).split(","))
+ inputFiles.addAll(metadata.getAllMetadata(inputFileKey));
+ return inputFiles;
+ }
+
+ private List<String> getDependencyFiles(Metadata metadata) throws Exception {
+ Vector<String> dependencyFiles = new Vector<String>();
+ for (String dependencyFileKey : metadata.getMetadata(DEPENDENCY_FILE_KEYS).split(","))
+ dependencyFiles.addAll(metadata.getAllMetadata(dependencyFileKey));
+ return dependencyFiles;
+ }
+
+ @Override
+ public int getOpenSlots(TaskInstance workflowInstance) throws Exception {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public boolean hasOpenSlots(TaskInstance workflowInstance) throws Exception {
+ return true;
+ }
+
+// public static class MapperTask extends Mapper<Object, Text, Text, File> {
+// private Text outputFilesText = new Text("OutputFiles");
+//
+// public void map(Object key, Text value, Context context)
+// throws IOException, InterruptedException {
+// Path[] localFiles = context.getLocalCacheFiles();
+// File outputFile = new File("/path/to/output/file");
+// int returnValue = ExecUtils.callProgram("/usr/bin/time -v -o log/runtime_" + value.toString() + ".txt /path/to/exe config/${config_type}/l2_fp.config output/l2_" + value.toString() + ".h5) > log/l2_" + value.toString() + ".log.running 2>&1", (File) null);
+// ExecUtils.callProgram("mv -f log/l2_" + value.toString() + ".log.running log/l2_" + value.toString() + ".log", (File) null);
+// context.write(outputFilesText, outputFile);
+// }
+// }
+//
+// public static class ReducerTask extends Reducer<Text, File, Text, File> {
+// private Text outputFileText = new Text("OutputFile");
+//
+// public void reduce(Text key, Iterable<File> values,
+// Context context) throws IOException, InterruptedException {
+// File outputFile = new File("/path/to/output/concat/file");
+// context.write(outputFileText, outputFile);
+// }
+// }
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopRunner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,23 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+import com.thoughtworks.xstream.XStream;
+
+public class HadoopTaskInstance extends Mapper<Text, Text, Text, File> {
+
+ public static final String XSTREAM_TASK = "hadoop.cas.xstream.task";
+
+ public void map(Text key, Text value, Context context)
+ throws IOException, InterruptedException {
+ XStream xstream = new XStream();
+ TaskInstance taskInstance = (TaskInstance) xstream.fromXML(context.getConfiguration().get(XSTREAM_TASK));
+ taskInstance.execute();
+ }
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskInstance.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,117 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClient;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineClientFactory;
+import org.apache.oodt.cas.workflow.server.channel.xmlrpc.XmlRpcCommunicationChannelClientFactory;
+import org.apache.oodt.cas.workflow.state.done.FailureState;
+import org.apache.oodt.cas.workflow.state.done.StoppedState;
+import org.apache.oodt.cas.workflow.state.done.SuccessState;
+import org.apache.oodt.cas.workflow.state.running.ExecutingState;
+
+public class HadoopTaskOutputCommitter extends OutputCommitter {
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ new File(taskContext.getWorkingDirectory().toString()).delete();
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ new File(taskContext.getWorkingDirectory().toString()).delete();
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return taskContext.getWorkingDirectory() != null;
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ super.abortJob(jobContext, state);
+ this.cleanupJobFiles(jobContext);
+ try {
+ WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+ if (state.equals(JobStatus.State.FAILED))
+ wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new FailureState(""));
+ else if (state.equals(JobStatus.State.KILLED))
+ wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new StoppedState(""));
+ }catch (Exception e) {
+ throw new IOException();
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ super.commitJob(jobContext);
+ this.cleanupJobFiles(jobContext);
+ try {
+ WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+ wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new SuccessState(""));
+ }catch (Exception e) {
+ throw new IOException();
+ }
+ }
+
+ private void cleanupJobFiles(JobContext jobContext) throws IOException {
+ try {
+ Cluster cluster = new Cluster(jobContext.getConfiguration());
+ for (String inputFile : jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(","))
+ cluster.getFileSystem().delete(new Path(inputFile), false);
+ for (String dependencyFile : jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(","))
+ cluster.getFileSystem().delete(new Path(dependencyFile), false);
+ }catch (Exception e) {
+ throw new IOException("Failed to cleanup job files : " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ try {
+ Cluster cluster = new Cluster(jobContext.getConfiguration());
+ for (String inputFile : jobContext.getConfiguration().get(HadoopTaskProperties.INPUT_FILES).split(",")) {
+ Path inputFilePath = new Path(inputFile);
+ cluster.getFileSystem().copyFromLocalFile(inputFilePath, inputFilePath);
+ }
+ for (String dependencyFile : jobContext.getConfiguration().get(HadoopTaskProperties.DEPENDENCY_FILES).split(",")) {
+ Path dependencyFilePath = new Path(dependencyFile);
+ cluster.getFileSystem().copyFromLocalFile(dependencyFilePath, dependencyFilePath);
+ }
+ try {
+ WorkflowEngineClient wmClient = this.getWorkflowClient(jobContext);
+ wmClient.setWorkflowState(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_INSTANCE_ID), jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_MODEL_ID), new ExecutingState(""));
+ }catch (Exception e) {}
+ }catch (Exception e) {
+ throw new IOException("Failed to setup job '" + jobContext.getJobName() + "' : " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ new File(taskContext.getWorkingDirectory().toString()).mkdirs();
+ }
+
+ private WorkflowEngineClient getWorkflowClient(JobContext jobContext) {
+ XmlRpcCommunicationChannelClientFactory xmlrpcFactory = new XmlRpcCommunicationChannelClientFactory();
+ xmlrpcFactory.setChunkSize(1024);
+ xmlrpcFactory.setConnectionRetries(10);
+ xmlrpcFactory.setConnectionRetryIntervalSecs(30);
+ xmlrpcFactory.setConnectionTimeout(60);
+ xmlrpcFactory.setRequestTimeout(20);
+ xmlrpcFactory.setServerUrl(jobContext.getConfiguration().get(HadoopTaskProperties.WORKFLOW_URL));
+ WorkflowEngineClientFactory wengineFactory = new WorkflowEngineClientFactory();
+ wengineFactory.setAutoPagerSize(1000);
+ wengineFactory.setCommunicationChannelClientFactory(xmlrpcFactory);
+ return wengineFactory.createEngine();
+ }
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputCommitter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,32 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class HadoopTaskOutputFormat extends OutputFormat<Text, Text> {
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new HadoopTaskOutputCommitter();
+ }
+
+ @Override
+ public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskOutputFormat.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java?rev=1064378&view=auto
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java (added)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java Fri Jan 28 00:18:17 2011
@@ -0,0 +1,14 @@
+package org.apache.oodt.cas.workflow.engine.runner;
+
+public class HadoopTaskProperties {
+
+ public static final String WORKFLOW_URL = "hadoop.cas.task.wm.url";
+ public static final String WORKFLOW_INSTANCE_ID = "hadoop.cas.task.wm.instance.id";
+ public static final String WORKFLOW_MODEL_ID = "hadoop.cas.task.wm.model.id";
+
+ public static final String DEPENDENCY_FILES = "hadoop.cas.task.dependency.files";
+ public static final String INPUT_FILES = "hadoop.cas.task.input.files";
+
+ private HadoopTaskProperties() {}
+
+}
Propchange: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/HadoopTaskProperties.java
------------------------------------------------------------------------------
svn:mime-type = text/plain