You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:38:31 UTC

[47/51] [partial] incubator-taverna-engine git commit:

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
new file mode 100644
index 0000000..132af05
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+public class TavernaMapper extends org.apache.hadoop.mapreduce.Mapper<int[], Map<String, Path>, Object, Object> {
+
+	private org.apache.hadoop.mapreduce.Mapper.Context context;
+
+	@Override
+	protected void setup(Context context) throws IOException,
+			InterruptedException {
+		this.context = context;
+	}
+	
+	@Override
+	protected void map(int[] key, Map<String, Path> value, 
+              Context context) throws IOException, InterruptedException {
+		
+		// Value contains a map of input ports to data values on those ports 
+		// (i.e. file paths to data on the input ports) 
+
+		
+		// Get the activity and invoke it with the passed inputs per port
+//		
+//		String activityClassName = context.getConfiguration().get("taverna.activity.class");
+//		String activityConfigurationXML = context.getConfiguration().get("taverna.activity.configuration");
+//	
+//	    ClassLoader classLoader = this.getClass().getClassLoader();
+//	    Class<?> activityClass = null;
+//	    AbstractAsynchronousActivity<?> activity = null;
+//	    try {
+//	        activityClass = classLoader.loadClass(activityClassName);
+//	        activity = (AbstractAsynchronousActivity<?>) activityClass.newInstance();
+//	    } catch (ClassNotFoundException e) {
+//			// TODO Auto-generated catch block
+//	        e.printStackTrace();
+//	    } catch (InstantiationException e) {
+//			// TODO Auto-generated catch block
+//			e.printStackTrace();
+//		} catch (IllegalAccessException e) {
+//			// TODO Auto-generated catch block
+//			e.printStackTrace();
+//		}
+//	    
+//	    activity.configure(activityConfigurationXML);
+//	    activity.executeAsynch(data, callback);
+
+		System.out.println("Index: " + key);
+
+		// Input port names
+		Iterator<String> iterator = value.keySet().iterator();
+		while(iterator.hasNext()){
+			String inputPortName = iterator.next();
+			// Simply read values from input files and concatenate them
+			Path inputFilePath = value.get(inputPortName);
+			FSDataInputStream fileInputStream = inputFilePath.getFileSystem(null).open(inputFilePath);
+			//fileInputStream.
+			System.out.println("Input port: " + inputPortName + ". Input value: "+ inputFilePath +".");
+		}
+		
+		// Map of output ports to data values on those ports 
+		// (i.e. file paths to data on the output ports)
+		Map<String, Path> outputValue = new HashMap<String, Path>();
+		context.write(key, outputValue);
+	}
+	  
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
new file mode 100644
index 0000000..613f944
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
@@ -0,0 +1,105 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ *
+ *
+ * @author David Withers
+ */
+public class TavernaRecordReader extends RecordReader<LongWritable, MapWritable> {
+
+	private FileSplit fileSplit;
+	private String recordName;
+	private FileStatus[] files;
+	private int index = -1;
+	private Map<String, String> datalinks;
+
+	@Override
+	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+		fileSplit = (FileSplit) split;
+		Path path = fileSplit.getPath();
+		recordName = path.getName();
+		files = path.getFileSystem(context.getConfiguration()).listStatus(path);
+		setDatalinks(context);
+	}
+
+	/**
+	 * @param context
+	 */
+	private void setDatalinks(TaskAttemptContext context) {
+		datalinks = new HashMap<String, String>();
+		String datalinkConfig = context.getConfiguration().get("taverna.datalinks");
+		if (datalinkConfig != null) {
+			String[] datalinksSplit = datalinkConfig.split(",");
+			for (String datalink : datalinksSplit) {
+				String[] split = datalink.split("\\|");
+				if (split.length == 2) {
+					datalinks.put(split[0], split[1]);
+				}
+			}
+		}
+	}
+
+	@Override
+	public boolean nextKeyValue() throws IOException, InterruptedException {
+		index++;
+		return index < files.length;
+	}
+
+	@Override
+	public LongWritable getCurrentKey() throws IOException, InterruptedException {
+		return new LongWritable(Long.valueOf(files[index].getPath().getName()));
+	}
+
+	@Override
+	public MapWritable getCurrentValue() throws IOException, InterruptedException {
+		MapWritable mapWritable = new MapWritable();
+		mapWritable.put(new Text("tag"), new Text(datalinks.get(recordName)));
+		mapWritable.put(new Text("record"), new Text(files[index].getPath().toString()));
+		return mapWritable;
+	}
+
+	@Override
+	public float getProgress() throws IOException, InterruptedException {
+		return files.length == 0 ? 1 : (index + 1) / files.length;
+	}
+
+	@Override
+	public void close() throws IOException {
+
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
new file mode 100644
index 0000000..bf489db
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+public class TavernaReducer extends
+		org.apache.hadoop.mapreduce.Reducer<int[], Map<String, Path>, Object, Object> {
+
+	private Context context;
+
+	@Override
+	protected void setup(Context context) throws IOException,
+			InterruptedException {
+		this.context = context;
+	}
+
+	@Override
+	protected void reduce(int[] key, Iterable<Map<String, Path>> values,
+			Context context) throws IOException, InterruptedException {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
new file mode 100644
index 0000000..a23be06
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
@@ -0,0 +1,68 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ *
+ * @author David Withers
+ */
+public class Test extends Configured implements Tool {
+
+	@Override
+	public int run(String[] args) throws Exception {
+		Configuration configuration = getConf();
+		Job job = new Job(configuration);
+		job.setJarByClass(Test.class);
+		job.setJobName("wordcount");
+
+		job.setOutputKeyClass(int[].class);
+		job.setOutputValueClass(Map.class);
+
+		job.setMapperClass(TavernaMapper.class);
+//		job.setCombinerClass(Reduce.class);
+		job.setReducerClass(TavernaReducer.class);
+
+		job.setInputFormatClass(TavernaInputFormat.class);
+		job.setOutputFormatClass(TextOutputFormat.class);
+
+		TavernaInputFormat.setInputPaths(job, new Path(args[0]));
+		FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+		boolean success = job.waitForCompletion(true);
+		return success ? 0 : 1;
+	}
+
+	public static void main(String[] args) throws Exception {
+		int ret = ToolRunner.run(new Test(), args);
+		System.exit(ret);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
new file mode 100644
index 0000000..4c5c115
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Text;
+
+public class TextArrayWritable extends ArrayWritable {
+	public TextArrayWritable() {
+		super(Text.class);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
deleted file mode 100644
index 122b473..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * An input format that receives an input directory containing a number of directories with input files 
- * for each input port to a Taverna processor/activity that will be executed as part of this
- * MapReduce job. Mapping between directory name -> Taverna processor/activity input port name
- * is carried in the job's Context.
- * 
- * @author Alex Nenadic
- *
- */
-public class CrossProductInputFormat extends
-		FileInputFormat<Text, TextArrayWritable> {
-
-	private static final Log Logger = LogFactory.getLog(CrossProductInputFormat.class);
-
-	// Do not split files into blocks
-	@Override
-	protected boolean isSplitable(JobContext context, Path filename) {
-		return false;
-	}
-
-	@Override
-	public RecordReader<Text, TextArrayWritable> createRecordReader(
-			InputSplit split, TaskAttemptContext context) {
-		return new CrossProductRecordReader();
-	}
-
-	@Override
-	public List<InputSplit> getSplits(JobContext job) throws IOException {
-
-	    // Generate splits. Split is a list of directories where each directory 
-		// contains inputs for one input port of the Taverna processor/activity we 
-		// are invoking. 
-		// We will have only one split for cross product that will know about all
-		// the files in all input directories and will generate RecordReaders 
-		// for every combination of files inside these directories.
-//	    CrossProductInputSplit split = new CrossProductInputSplit();
-	    
-	    // List the input port directories contained in the input directory passed 
-	    // in from the command line.
-	    List<FileStatus> inputPortDirectories = listStatus(job); 
-	    
-		final FileSystem fs = job.getWorkingDirectory().getFileSystem(job.getConfiguration());
-		Path workingDirectory = job.getWorkingDirectory();
-		System.out.println("Working directory: " + workingDirectory);
-		System.out.println("Adding directories to the cross product split:");
-		ArrayList<Path> inputPortDirectoriesPaths = new ArrayList<Path>();
-    	for (FileStatus inputPortDirectory : inputPortDirectories){
-    		// TODO input port directories need to be ordered in the order of the 
-    		// input ports of the Taverna processor/activity they are going into
-            
-    		//inputPortDirectoriesPaths.add(new Text(inputPortDirectory.getPath().toString()));
-    		inputPortDirectoriesPaths.add(inputPortDirectory.getPath());
-    		System.out.println(inputPortDirectory.getPath());
-
-    	}
-	    CrossProductInputSplit split = new CrossProductInputSplit(workingDirectory, inputPortDirectoriesPaths);
-	    
-
-	    List<InputSplit> splits = new ArrayList<InputSplit>();
-	    splits.add(split);
-		
-	    return splits;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
deleted file mode 100644
index 2ff9113..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- *
- *
- * @author Alex Nenadic
- */
-public class CrossProductInputSplit extends FileSplit {
-	//
-	// private long length = 0;
-	// private String[] hosts;
-	private List<Path> inputPortDirectories;
-	private Path workingDirectory;
-
-	public CrossProductInputSplit() {
-		super(null,0,0,null);
-		inputPortDirectories = new ArrayList<Path>();
-		System.out.println("Calling default constructor for cross product split");
-	}
-
-	public CrossProductInputSplit(Path workingDirectory, List<Path> inputPortDirectories) {
-		// this.length = length;
-		// this.hosts = hosts;
-		super(workingDirectory, 0, 0, new String[0]);
-		this.workingDirectory = workingDirectory;
-		this.inputPortDirectories = inputPortDirectories;
-		System.out.println("Calling non-default constructor for cross product split");
-	}
-
-	public void addInputPortDirectory(Path path) {
-		inputPortDirectories.add(path);
-	}
-
-	public List<Path> getInputPortDirectories() {
-		return inputPortDirectories;
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {
-		super.write(out);
-		Text.writeString(out, workingDirectory.toString());
-		out.writeInt(inputPortDirectories.size());
-		for (Path path : inputPortDirectories) {
-			Text.writeString(out, path.toString());
-		}
-	}
-
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		workingDirectory = new Path(Text.readString(in));
-		int length = in.readInt();
-		for (int i = 0; i < length; i++) {
-			inputPortDirectories.add(new Path(Text.readString(in)));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
deleted file mode 100644
index 6602f55..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.pingel.util.CrossProduct;
-
-public class CrossProductRecordReader extends RecordReader<Text, TextArrayWritable>{
-
-	private static final Log Logger = LogFactory.getLog(CrossProductRecordReader.class);
-
-	// Input directories (one for each port) containing files that are used 
-	// as inputs to Taverna processor/activity
-	private List<Path> inputPortDirectories;
-	
-	private CrossProduct<String> crossProduct ;
-
-	private Iterator<List<String>> crossProductIterator;
-
-	private List<String> currentIndexes;
-
-	@Override
-	public void initialize(InputSplit split, TaskAttemptContext context)
-			throws IOException, InterruptedException {
-
-		System.out.println("Inside record reader's initialize");
-		
-		CrossProductInputSplit crossProductSplit = (CrossProductInputSplit)split;
-		inputPortDirectories = crossProductSplit.getInputPortDirectories();
-		System.out.println("Record reader received " + +inputPortDirectories.size() + " input port directories");
-
-		List<List<String>> iterables = new ArrayList<List<String>>();
-		for (int i=0; i<inputPortDirectories.size();i++ ){
-	
-			Path inputPortDirectory = inputPortDirectories.get(i);
-			//Path inputPortDirectory = inputPortDirectories.get(i);
-			FileStatus[] files = inputPortDirectory.getFileSystem(context.getConfiguration()).listStatus(inputPortDirectory);
-			List<String> fileNames = new ArrayList<String>();
-			for (FileStatus file : files){
-				fileNames.add(file.getPath().getName());
-			}
-			iterables.add(fileNames);
-		}
-
-		crossProduct = new CrossProduct<String>(iterables);
-		crossProductIterator = crossProduct.iterator();
-		
-	}
-
-	@Override
-	public boolean nextKeyValue(){
-
-		boolean hasNextKey = crossProductIterator.hasNext();
-		System.out.println("Has record reader next key value? " + hasNextKey);
-		if (hasNextKey){
-			currentIndexes = crossProductIterator.next();
-		}
-		return hasNextKey;
-	}
-
-	@Override
-	public Text getCurrentKey() throws IOException, InterruptedException {
-	
-		StringBuffer sb = new StringBuffer();
-		for (String index : currentIndexes){
-			sb.append(index + ".");
-		}
-		// Remove last "."
-		String indexesString = sb.toString();
-		System.out.println("Get current key: " + indexesString);
-		if (indexesString.contains(".")){
-			indexesString = indexesString.substring(0, indexesString.length() - 1);
-		}
-		return new Text(indexesString);
-	}
-
-	@Override
-	public TextArrayWritable getCurrentValue() {
-				
-		TextArrayWritable arrayWritable = new TextArrayWritable();
-		Text[] array = new Text[currentIndexes.size()];
-		for(int i= 0; i< currentIndexes.size(); i++){
-			Path file = new Path(inputPortDirectories.get(i).toString(), currentIndexes.get(i));
-			array[i] = new Text(file.toString());
-		}
-		arrayWritable.set(array);
-		return arrayWritable;
-	}
-
-	@Override
-	public float getProgress() throws IOException, InterruptedException {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public void close() throws IOException {
-		// TODO Auto-generated method stub
-		
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
deleted file mode 100644
index e5dc063..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class CrossProductTest extends Configured implements Tool {
-
-	public static class Map extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> {
-		public void map(Text key, TextArrayWritable value, Context context) throws IOException,
-				InterruptedException {
-			System.out.println("Map key = " + key);
-			System.out.println("Map value = " );
-
-			for (int i = 0; i < value.get().length; i++){
-				System.out.println("  " + value.get()[i]);
-			}
-
-			context.write(key, value);
-		}
-	}
-
-	public static class Reduce extends Reducer<Text, TextArrayWritable, Text, Text> {
-		public void reduce(Text key, Iterable<TextArrayWritable> values, Context context)
-				throws IOException, InterruptedException {
-
-			System.out.println("Reduce key = " + key);
-			context.write(key, f(values));
-		}
-
-		private Text f(Iterable<TextArrayWritable> values) {
-			StringBuilder sb = new StringBuilder();
-
-			// There should be only one array
-			TextArrayWritable arrayValue = values.iterator().next();
-
-			for (int i = 0; i < arrayValue.get().length; i++){
-				sb.append(arrayValue.get()[i] + "\nx");
-			}
-			String str = sb.toString();
-			if (str.contains("\nx")){
-				str = str.substring(0, sb.lastIndexOf("\nx") -1);
-			}
-			System.out.println("Result of function f(): " + str);
-
-			return new Text(str);
-		}
-	}
-
-	public int run(String[] args) throws Exception {
-
-		Configuration configuration = getConf();
-		configuration.set("taverna.datalinks", "A|X,B|Y");
-		System.out.println(configuration);
-		Job job = new Job(configuration);
-		job.setJarByClass(CrossProductTest.class);
-		job.setJobName("crossproduct");
-
-		job.setOutputKeyClass(Text.class);
-		job.setOutputValueClass(TextArrayWritable.class);
-
-		job.setMapperClass(Map.class);
-//		job.setCombinerClass(Reduce.class);
-		job.setReducerClass(Reduce.class);
-
-		job.setInputFormatClass(CrossProductInputFormat.class);
-		job.setOutputFormatClass(TextOutputFormat.class);
-
-		FileInputFormat.setInputPaths(job, new Path(args[0]));
-		System.out.println("Input dir: " + args[0]);
-		FileOutputFormat.setOutputPath(job, new Path(args[1]));
-		System.out.println("Output dir: " + args[1]);
-
-		boolean success = job.waitForCompletion(true);
-		return success ? 0 : 1;
-	}
-
-	public static void main(String[] args) throws Exception {
-		int ret = ToolRunner.run(new CrossProductTest(), args);
-		System.exit(ret);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
deleted file mode 100644
index 2fa38e4..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class DotProductTest extends Configured implements Tool {
-
-	public static class Map extends Mapper<LongWritable, MapWritable, LongWritable, MapWritable> {
-		public void map(LongWritable key, MapWritable value, Context context) throws IOException,
-				InterruptedException {
-			System.out.println("Map key = " + key);
-			System.out.println("Map value tag = " + value.get(new Text("tag")));
-			System.out.println("Map value record = " + value.get(new Text("record")));
-			context.write(key, value);
-		}
-	}
-
-	public static class Reduce extends Reducer<LongWritable, MapWritable, LongWritable, Text> {
-		public void reduce(LongWritable key, Iterable<MapWritable> values, Context context)
-				throws IOException, InterruptedException {
-
-			System.out.println("Reduce key = " + key);
-			context.write(key, f(values));
-			context.write(key, f(values));
-		}
-
-		private Text f(Iterable<MapWritable> values) {
-			StringBuilder sb = new StringBuilder();
-			for (MapWritable value : values) {
-				System.out.println("Reduce tag = " + value.get(new Text("tag")));
-				System.out.println("Reduce value = " + value.get(new Text("record")));
-				sb.append(value.get(new Text("record")) + " ");
-			}
-			return new Text(sb.toString());
-		}
-	}
-
-	public int run(String[] args) throws Exception {
-		java.util.Map datalinks = new HashMap();
-
-
-		Configuration configuration = getConf();
-		configuration.set("taverna.datalinks", "A|X,B|Y");
-		System.out.println(configuration);
-		Job job = new Job(configuration);
-		job.setJarByClass(DotProductTest.class);
-		job.setJobName("dotproduct");
-
-		job.setOutputKeyClass(LongWritable.class);
-		job.setOutputValueClass(MapWritable.class);
-
-		job.setMapperClass(Map.class);
-//		job.setCombinerClass(Reduce.class);
-		job.setReducerClass(Reduce.class);
-
-		job.setInputFormatClass(TavernaInputFormat.class);
-		job.setOutputFormatClass(TextOutputFormat.class);
-
-		FileInputFormat.setInputPaths(job, new Path(args[0]));
-		FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
-		boolean success = job.waitForCompletion(true);
-		return success ? 0 : 1;
-	}
-
-	public static void main(String[] args) throws Exception {
-		int ret = ToolRunner.run(new DotProductTest(), args);
-		System.exit(ret);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
deleted file mode 100644
index 57b41c5..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaInputFormat extends FileInputFormat<LongWritable, MapWritable> {
-
-	@Override
-	public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit split,
-			TaskAttemptContext context) throws IOException, InterruptedException {
-		return new TavernaRecordReader();
-	}
-
-	@Override
-	protected boolean isSplitable(JobContext context, Path filename) {
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
deleted file mode 100644
index d1bf0b3..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaInputSplit extends InputSplit {
-	private int[] index;
-	private Map<String, Path> inputs;
-	private long length;
-	private String[] hosts;
-
-	public TavernaInputSplit(int[] index, Map<String, Path> inputs, long length, String[] hosts) {
-		this.index = index;
-		this.inputs = inputs;
-		this.length = length;
-		this.hosts = hosts;
-	}
-
-	public int[] getIndex() {
-		return index;
-	}
-
-	public Map<String, Path> getInputs() {
-		return inputs;
-	}
-
-	@Override
-	public long getLength() throws IOException, InterruptedException {
-		return length;
-	}
-
-	@Override
-	public String[] getLocations() throws IOException, InterruptedException {
-		if (hosts == null) {
-			return new String[] {};
-		} else {
-			return this.hosts;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
deleted file mode 100644
index fa5d1dc..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-
-public class TavernaMapper extends org.apache.hadoop.mapreduce.Mapper<int[], Map<String, Path>, Object, Object> {
-
-	private org.apache.hadoop.mapreduce.Mapper.Context context;
-
-	@Override
-	protected void setup(Context context) throws IOException,
-			InterruptedException {
-		this.context = context;
-	}
-	
-	@Override
-	protected void map(int[] key, Map<String, Path> value, 
-              Context context) throws IOException, InterruptedException {
-		
-		// Value contains a map of input ports to data values on those ports 
-		// (i.e. file paths to data on the input ports) 
-
-		
-		// Get the activity and invoke it with the passed inputs per port
-//		
-//		String activityClassName = context.getConfiguration().get("taverna.activity.class");
-//		String activityConfigurationXML = context.getConfiguration().get("taverna.activity.configuration");
-//	
-//	    ClassLoader classLoader = this.getClass().getClassLoader();
-//	    Class<?> activityClass = null;
-//	    AbstractAsynchronousActivity<?> activity = null;
-//	    try {
-//	        activityClass = classLoader.loadClass(activityClassName);
-//	        activity = (AbstractAsynchronousActivity<?>) activityClass.newInstance();
-//	    } catch (ClassNotFoundException e) {
-//			// TODO Auto-generated catch block
-//	        e.printStackTrace();
-//	    } catch (InstantiationException e) {
-//			// TODO Auto-generated catch block
-//			e.printStackTrace();
-//		} catch (IllegalAccessException e) {
-//			// TODO Auto-generated catch block
-//			e.printStackTrace();
-//		}
-//	    
-//	    activity.configure(activityConfigurationXML);
-//	    activity.executeAsynch(data, callback);
-
-		System.out.println("Index: " + key);
-
-		// Input port names
-		Iterator<String> iterator = value.keySet().iterator();
-		while(iterator.hasNext()){
-			String inputPortName = iterator.next();
-			// Simply read values from input files and concatenate them
-			Path inputFilePath = value.get(inputPortName);
-			FSDataInputStream fileInputStream = inputFilePath.getFileSystem(null).open(inputFilePath);
-			//fileInputStream.
-			System.out.println("Input port: " + inputPortName + ". Input value: "+ inputFilePath +".");
-		}
-		
-		// Map of output ports to data values on those ports 
-		// (i.e. file paths to data on the output ports)
-		Map<String, Path> outputValue = new HashMap<String, Path>();
-		context.write(key, outputValue);
-	}
-	  
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
deleted file mode 100644
index 190d91a..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaRecordReader extends RecordReader<LongWritable, MapWritable> {
-
-	private FileSplit fileSplit;
-	private String recordName;
-	private FileStatus[] files;
-	private int index = -1;
-	private Map<String, String> datalinks;
-
-	@Override
-	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-		fileSplit = (FileSplit) split;
-		Path path = fileSplit.getPath();
-		recordName = path.getName();
-		files = path.getFileSystem(context.getConfiguration()).listStatus(path);
-		setDatalinks(context);
-	}
-
-	/**
-	 * @param context
-	 */
-	private void setDatalinks(TaskAttemptContext context) {
-		datalinks = new HashMap<String, String>();
-		String datalinkConfig = context.getConfiguration().get("taverna.datalinks");
-		if (datalinkConfig != null) {
-			String[] datalinksSplit = datalinkConfig.split(",");
-			for (String datalink : datalinksSplit) {
-				String[] split = datalink.split("\\|");
-				if (split.length == 2) {
-					datalinks.put(split[0], split[1]);
-				}
-			}
-		}
-	}
-
-	@Override
-	public boolean nextKeyValue() throws IOException, InterruptedException {
-		index++;
-		return index < files.length;
-	}
-
-	@Override
-	public LongWritable getCurrentKey() throws IOException, InterruptedException {
-		return new LongWritable(Long.valueOf(files[index].getPath().getName()));
-	}
-
-	@Override
-	public MapWritable getCurrentValue() throws IOException, InterruptedException {
-		MapWritable mapWritable = new MapWritable();
-		mapWritable.put(new Text("tag"), new Text(datalinks.get(recordName)));
-		mapWritable.put(new Text("record"), new Text(files[index].getPath().toString()));
-		return mapWritable;
-	}
-
-	@Override
-	public float getProgress() throws IOException, InterruptedException {
-		return files.length == 0 ? 1 : (index + 1) / files.length;
-	}
-
-	@Override
-	public void close() throws IOException {
-
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
deleted file mode 100644
index 11f85dd..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-
-public class TavernaReducer extends
-		org.apache.hadoop.mapreduce.Reducer<int[], Map<String, Path>, Object, Object> {
-
-	private Context context;
-
-	@Override
-	protected void setup(Context context) throws IOException,
-			InterruptedException {
-		this.context = context;
-	}
-
-	@Override
-	protected void reduce(int[] key, Iterable<Map<String, Path>> values,
-			Context context) throws IOException, InterruptedException {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
deleted file mode 100644
index 65a75d8..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- *
- *
- * @author David Withers
- */
-public class Test extends Configured implements Tool {
-
-	@Override
-	public int run(String[] args) throws Exception {
-		Configuration configuration = getConf();
-		Job job = new Job(configuration);
-		job.setJarByClass(Test.class);
-		job.setJobName("wordcount");
-
-		job.setOutputKeyClass(int[].class);
-		job.setOutputValueClass(Map.class);
-
-		job.setMapperClass(TavernaMapper.class);
-//		job.setCombinerClass(Reduce.class);
-		job.setReducerClass(TavernaReducer.class);
-
-		job.setInputFormatClass(TavernaInputFormat.class);
-		job.setOutputFormatClass(TextOutputFormat.class);
-
-		TavernaInputFormat.setInputPaths(job, new Path(args[0]));
-		FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
-		boolean success = job.waitForCompletion(true);
-		return success ? 0 : 1;
-	}
-
-	public static void main(String[] args) throws Exception {
-		int ret = ToolRunner.run(new Test(), args);
-		System.exit(ret);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
deleted file mode 100644
index 1b64c77..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.Text;
-
-public class TextArrayWritable extends ArrayWritable {
-	public TextArrayWritable() {
-		super(Text.class);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java b/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
new file mode 100644
index 0000000..d9bfbf1
--- /dev/null
+++ b/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
@@ -0,0 +1,353 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl;
+
+import java.net.URI;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.taverna.platform.capability.api.ActivityConfigurationException;
+import org.apache.taverna.platform.capability.api.ActivityNotFoundException;
+import org.apache.taverna.platform.capability.api.DispatchLayerConfigurationException;
+import org.apache.taverna.platform.capability.api.DispatchLayerNotFoundException;
+import org.apache.taverna.platform.execution.api.ExecutionEnvironment;
+import org.apache.taverna.platform.execution.api.ExecutionEnvironmentService;
+import org.apache.taverna.platform.execution.api.ExecutionService;
+import org.apache.taverna.scufl2.api.activity.Activity;
+import org.apache.taverna.scufl2.api.common.NamedSet;
+import org.apache.taverna.scufl2.api.common.Scufl2Tools;
+import org.apache.taverna.scufl2.api.configurations.Configuration;
+import org.apache.taverna.scufl2.api.core.Processor;
+import org.apache.taverna.scufl2.api.profiles.ProcessorBinding;
+import org.apache.taverna.scufl2.api.profiles.Profile;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Implementation of the ExecutionEnvironmentService.
+ *
+ * @author David Withers
+ */
+public class ExecutionEnvironmentServiceImpl implements ExecutionEnvironmentService {
+	private static final Logger logger = Logger.getLogger(ExecutionEnvironmentServiceImpl.class.getName());
+
+	@SuppressWarnings("unused")
+	private final Scufl2Tools scufl2Tools = new Scufl2Tools();
+	private Set<ExecutionService> executionServices;
+
+	@Override
+	public Set<ExecutionEnvironment> getExecutionEnvironments() {
+		Set<ExecutionEnvironment> executionEnvironments = new HashSet<>();
+		for (ExecutionService executionService : executionServices)
+			executionEnvironments.addAll(executionService
+					.getExecutionEnvironments());
+		return executionEnvironments;
+	}
+
+	@Override
+	public Set<ExecutionEnvironment> getExecutionEnvironments(Profile profile) {
+		Set<ExecutionEnvironment> validExecutionEnvironments = new HashSet<>();
+		for (ExecutionEnvironment executionEnvironment : getExecutionEnvironments())
+			if (isValidExecutionEnvironment(executionEnvironment, profile))
+				validExecutionEnvironments.add(executionEnvironment);
+		return validExecutionEnvironments;
+	}
+
+	/**
+	 * Sets the ExecutionServices that will be used to find ExecutionEnvironments.
+	 *
+	 * @param executionServices
+	 *            the ExecutionServices that will be used to find ExecutionEnvironments
+	 */
+	public void setExecutionServices(Set<ExecutionService> executionServices) {
+		this.executionServices = executionServices;
+	}
+
+	/**
+	 * @param executionEnvironment
+	 * @param profile
+	 * @return
+	 */
+	private boolean isValidExecutionEnvironment(ExecutionEnvironment executionEnvironment,
+			Profile profile) {
+		NamedSet<ProcessorBinding> processorBindings = profile.getProcessorBindings();
+		for (ProcessorBinding processorBinding : processorBindings) {
+			Activity activity = processorBinding.getBoundActivity();
+			if (!executionEnvironment.activityExists(activity.getType())) {
+				logger.fine(MessageFormat.format("{0} does not contain activity {1}",
+						executionEnvironment.getName(), activity.getType()));
+				return false;
+			}
+			Configuration activityConfiguration = activity.getConfiguration();
+			if (!isValidActivityConfiguration(executionEnvironment, activityConfiguration, activity)) {
+				logger.fine(MessageFormat.format("Invalid activity configuration for {1} in {0}",
+						executionEnvironment.getName(), activity.getType()));
+				return false;
+			}
+			@SuppressWarnings("unused")
+			Processor processor = processorBinding.getBoundProcessor();
+			// TODO check that environment has required dispatch layers for processor configuration
+//			for (DispatchStackLayer dispatchStackLayer : processor.getDispatchStack()) {
+//				if (!executionEnvironment.dispatchLayerExists(dispatchStackLayer
+//						.getType())) {
+//					logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}",
+//							executionEnvironment.getName(),
+//							dispatchStackLayer.getType()));
+//					return false;
+//				}
+//
+//				List<Configuration> dispatchLayerConfigurations = scufl2Tools.configurationsFor(dispatchStackLayer, profile);
+//				if (dispatchLayerConfigurations.size() > 1) {
+//					logger.fine(MessageFormat.format("{0} contains multiple configurations for dispatch layer {1}",
+//							executionEnvironment.getName(),
+//							dispatchStackLayer.getType()));
+//				} else if (dispatchLayerConfigurations.size() == 1) {
+//					if (!isValidDispatchLayerConfiguration(executionEnvironment, dispatchLayerConfigurations.get(0), dispatchStackLayer)) {
+//						logger.fine(MessageFormat.format("Invalid dispatch layer configuration for {1} in {0}",
+//								executionEnvironment.getName(), dispatchStackLayer.getType()));
+//						return false;
+//					}
+//				}
+//			}
+		}
+		return true;
+	}
+
+	private boolean isValidActivityConfiguration(ExecutionEnvironment executionEnvironment,
+			Configuration configuration, Activity activity) {
+		try {
+			configuration.getJson();
+			configuration.getJsonSchema();
+			@SuppressWarnings("unused")
+			JsonNode environmentSchema = executionEnvironment.getActivityConfigurationSchema(activity.getType());
+			// TODO validate against schema
+		} catch (ActivityNotFoundException e) {
+			logger.fine(MessageFormat.format("{0} does not contain activity {1}",
+					executionEnvironment.getName(), activity.getType()));
+			return false;
+		} catch (ActivityConfigurationException e) {
+			logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}",
+					executionEnvironment.getName(), activity.getType()));
+			return false;
+		}
+		return true;
+	}
+
+	@SuppressWarnings("unused")
+	private boolean isValidDispatchLayerConfiguration(ExecutionEnvironment executionEnvironment,
+			Configuration configuration, URI dispatchLayerType) {
+		try {
+			JsonNode environmentSchema = executionEnvironment.getDispatchLayerConfigurationSchema(dispatchLayerType);
+			// TODO validate against schema
+		} catch (DispatchLayerNotFoundException e) {
+			logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}",
+					executionEnvironment.getName(), dispatchLayerType));
+			return false;
+		} catch (DispatchLayerConfigurationException e) {
+			logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}",
+					executionEnvironment.getName(), dispatchLayerType));
+			return false;
+		}
+		return true;
+	}
+
+//	/**
+//	 * @param propertyResourceDefinition
+//	 * @param propertyResource
+//	 * @return
+//	 */
+//	private boolean isValidPropertyResource(Configuration configuration,
+//			PropertyResourceDefinition propertyResourceDefinition, PropertyResource propertyResource) {
+//		if (!propertyResourceDefinition.getTypeURI().equals(propertyResource.getTypeURI())) {
+//			logger.fine(MessageFormat.format(
+//					"Property type {0} does not equal property definition type {1}",
+//					propertyResource.getTypeURI(), propertyResourceDefinition.getTypeURI()));
+//			return false;
+//		}
+//		List<PropertyDefinition> propertyDefinitions = propertyResourceDefinition
+//				.getPropertyDefinitions();
+//		Map<URI, SortedSet<PropertyObject>> properties = propertyResource.getProperties();
+//		for (PropertyDefinition propertyDefinition : propertyDefinitions) {
+//			SortedSet<PropertyObject> propertySet = properties.get(propertyDefinition
+//					.getPredicate());
+//			if (propertySet == null) {
+//				if (propertyDefinition.isRequired()) {
+//					logger.fine(MessageFormat.format("Required property {0} is missing",
+//							propertyDefinition.getPredicate()));
+//					return false;
+//				}
+//			} else {
+//				if (propertySet.size() == 0 && propertyDefinition.isRequired()) {
+//					logger.fine(MessageFormat.format("Required property {0} is missing",
+//							propertyDefinition.getPredicate()));
+//					return false;
+//				}
+//				if (propertySet.size() > 1 && !propertyDefinition.isMultiple()) {
+//					logger.fine(MessageFormat.format(
+//							"{0} properties found for singleton property {1}", propertySet.size(),
+//							propertyDefinition.getPredicate()));
+//					return false;
+//				}
+//				if (propertySet.size() > 1 && propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) {
+//					logger.fine(MessageFormat.format(
+//							"{0} property lists found for property {1}", propertySet.size(),
+//							propertyDefinition.getPredicate()));
+//					return false;
+//				}
+//				for (PropertyObject property : propertySet) {
+//					if (propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) {
+//						if (property instanceof PropertyList) {
+//							PropertyList propertyList = (PropertyList) property;
+//							for (PropertyObject propertyObject : propertyList) {
+//								if (!isValidProperty(configuration, propertyDefinition, propertyObject)) {
+//									logger.fine(MessageFormat.format("Property {0} is invalid",
+//											propertyDefinition.getPredicate()));
+//									return false;
+//								}
+//							}
+//						}
+//
+//					} else if (!isValidProperty(configuration, propertyDefinition, property)) {
+//						logger.fine(MessageFormat.format("Property {0} is invalid",
+//								propertyDefinition.getPredicate()));
+//						return false;
+//					}
+//				}
+//			}
+//		}
+//		return true;
+//	}
+//
+//	/**
+//	 * @param propertyDefinition
+//	 * @param property
+//	 * @return
+//	 */
+//	private boolean isValidProperty(Configuration configuration,
+//			PropertyDefinition propertyDefinition, PropertyObject property) {
+//		if (propertyDefinition instanceof PropertyLiteralDefinition) {
+//			if (property instanceof PropertyLiteral) {
+//				PropertyLiteralDefinition propertyLiteralDefinition = (PropertyLiteralDefinition) propertyDefinition;
+//				PropertyLiteral propertyLiteral = (PropertyLiteral) property;
+//				if (!propertyLiteral.getLiteralType().equals(
+//						propertyLiteralDefinition.getLiteralType())) {
+//					logger.fine(MessageFormat.format(
+//							"Property type {0} does not equal property definition type {1}",
+//							propertyLiteral.getLiteralType(),
+//							propertyLiteralDefinition.getLiteralType()));
+//					return false;
+//				}
+//				LinkedHashSet<String> options = propertyLiteralDefinition.getOptions();
+//				if (options != null && options.size() > 0) {
+//					if (!options.contains(propertyLiteral.getLiteralValue())) {
+//						logger.fine(MessageFormat.format("Property value {0} is not permitted",
+//								propertyLiteral.getLiteralValue()));
+//						return false;
+//					}
+//				}
+//			} else {
+//				logger.fine(MessageFormat.format("Expected a PropertyLiteral but got a {0}",
+//						property.getClass().getSimpleName()));
+//				return false;
+//			}
+//		} else if (propertyDefinition instanceof PropertyReferenceDefinition) {
+//			if (property instanceof PropertyReference) {
+//				PropertyReferenceDefinition propertyReferenceDefinition = (PropertyReferenceDefinition) propertyDefinition;
+//				PropertyReference propertyReference = (PropertyReference) property;
+//				LinkedHashSet<URI> options = propertyReferenceDefinition.getOptions();
+//				if (options != null && options.size() > 0) {
+//					if (!options.contains(propertyReference.getResourceURI())) {
+//						logger.fine(MessageFormat.format("Property value {0} is not permitted",
+//								propertyReference.getResourceURI()));
+//						return false;
+//					}
+//				}
+//			} else {
+//				logger.fine(MessageFormat.format("Expected a PropertyReference but got a {0}",
+//						property.getClass().getSimpleName()));
+//				return false;
+//			}
+//		} else if (propertyDefinition instanceof PropertyResourceDefinition) {
+//			if (property instanceof PropertyResource) {
+//				PropertyResourceDefinition propertyResourceDefinition = (PropertyResourceDefinition) propertyDefinition;
+//				PropertyResource propertyResource = (PropertyResource) property;
+//				return isValidPropertyResource(configuration, propertyResourceDefinition,
+//						propertyResource);
+//			} else if (property instanceof PropertyReference) {
+//				// special cases where a PropertyResource is actually a reference to a WorkflowBundle component
+//				PropertyReference propertyReference = (PropertyReference) property;
+//				WorkflowBundle workflowBundle = scufl2Tools.findParent(WorkflowBundle.class,
+//						configuration);
+//				URI configUri = uriTools.uriForBean(configuration);
+//				URI referenceUri = configUri.resolve(propertyReference.getResourceURI());
+//				if (workflowBundle != null) {
+//					URI predicate = propertyDefinition.getPredicate();
+//					WorkflowBean workflowBean = uriTools.resolveUri(referenceUri, workflowBundle);
+//					if (workflowBean == null) {
+//						logger.fine(MessageFormat.format(
+//								"Cannot resolve {0} in WorkflowBundle {1}",
+//								propertyReference.getResourceURI(), workflowBundle.getName()));
+//					}
+//					if (predicate.equals(SCUFL2.resolve("#definesInputPort"))) {
+//						if (workflowBean == null) {
+//							return false;
+//						}
+//						if (!(workflowBean instanceof InputActivityPort)) {
+//							logger.fine(MessageFormat.format(
+//									"{0} resolved to a {1}, expected a InputActivityPort",
+//									propertyReference.getResourceURI(), workflowBean.getClass()
+//											.getSimpleName()));
+//							return false;
+//						}
+//					} else if (predicate.equals(SCUFL2.resolve("#definesOutputPort"))) {
+//						if (workflowBean == null) {
+//							return false;
+//						}
+//						if (!(workflowBean instanceof OutputActivityPort)) {
+//							logger.fine(MessageFormat.format(
+//									"{0} resolved to a {1}, expected a OutputActivityPort",
+//									propertyReference.getResourceURI(), workflowBean.getClass()
+//											.getSimpleName()));
+//							return false;
+//						}
+//					} else {
+//						logger.fine(MessageFormat.format("Unexpected reference to {0}", predicate));
+//					}
+//				} else {
+//					logger.fine(MessageFormat
+//							.format("Cannot resolve reference to {0} because Configuration {1} not contained within a WorkflowBundle",
+//									referenceUri, configuration.getName()));
+//				}
+//			} else {
+//				logger.fine(MessageFormat.format("Expected a PropertyResource or PropertyReference but got a {0}",
+//						property.getClass().getSimpleName()));
+//				return false;
+//			}
+//		} else {
+//			logger.fine(MessageFormat.format("Unknown propery definition class {0}",
+//					propertyDefinition.getClass().getSimpleName()));
+//			return false;
+//		}
+//		return true;
+//	}
+
+}