You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:38:31 UTC
[47/51] [partial] incubator-taverna-engine git commit:
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
new file mode 100644
index 0000000..132af05
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaMapper.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+public class TavernaMapper extends org.apache.hadoop.mapreduce.Mapper<int[], Map<String, Path>, Object, Object> {
+
+ private org.apache.hadoop.mapreduce.Mapper.Context context;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ this.context = context;
+ }
+
+ @Override
+ protected void map(int[] key, Map<String, Path> value,
+ Context context) throws IOException, InterruptedException {
+
+ // Value contains a map of input ports to data values on those ports
+ // (i.e. file paths to data on the input ports)
+
+
+ // Get the activity and invoke it with the passed inputs per port
+//
+// String activityClassName = context.getConfiguration().get("taverna.activity.class");
+// String activityConfigurationXML = context.getConfiguration().get("taverna.activity.configuration");
+//
+// ClassLoader classLoader = this.getClass().getClassLoader();
+// Class<?> activityClass = null;
+// AbstractAsynchronousActivity<?> activity = null;
+// try {
+// activityClass = classLoader.loadClass(activityClassName);
+// activity = (AbstractAsynchronousActivity<?>) activityClass.newInstance();
+// } catch (ClassNotFoundException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// } catch (InstantiationException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// } catch (IllegalAccessException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+//
+// activity.configure(activityConfigurationXML);
+// activity.executeAsynch(data, callback);
+
+ System.out.println("Index: " + key);
+
+ // Input port names
+ Iterator<String> iterator = value.keySet().iterator();
+ while(iterator.hasNext()){
+ String inputPortName = iterator.next();
+ // Simply read values from input files and concatenate them
+ Path inputFilePath = value.get(inputPortName);
+ FSDataInputStream fileInputStream = inputFilePath.getFileSystem(null).open(inputFilePath);
+ //fileInputStream.
+ System.out.println("Input port: " + inputPortName + ". Input value: "+ inputFilePath +".");
+ }
+
+ // Map of output ports to data values on those ports
+ // (i.e. file paths to data on the output ports)
+ Map<String, Path> outputValue = new HashMap<String, Path>();
+ context.write(key, outputValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
new file mode 100644
index 0000000..613f944
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
@@ -0,0 +1,105 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ *
+ *
+ * @author David Withers
+ */
+public class TavernaRecordReader extends RecordReader<LongWritable, MapWritable> {
+
+ private FileSplit fileSplit;
+ private String recordName;
+ private FileStatus[] files;
+ private int index = -1;
+ private Map<String, String> datalinks;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ fileSplit = (FileSplit) split;
+ Path path = fileSplit.getPath();
+ recordName = path.getName();
+ files = path.getFileSystem(context.getConfiguration()).listStatus(path);
+ setDatalinks(context);
+ }
+
+ /**
+ * @param context
+ */
+ private void setDatalinks(TaskAttemptContext context) {
+ datalinks = new HashMap<String, String>();
+ String datalinkConfig = context.getConfiguration().get("taverna.datalinks");
+ if (datalinkConfig != null) {
+ String[] datalinksSplit = datalinkConfig.split(",");
+ for (String datalink : datalinksSplit) {
+ String[] split = datalink.split("\\|");
+ if (split.length == 2) {
+ datalinks.put(split[0], split[1]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ index++;
+ return index < files.length;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return new LongWritable(Long.valueOf(files[index].getPath().getName()));
+ }
+
+ @Override
+ public MapWritable getCurrentValue() throws IOException, InterruptedException {
+ MapWritable mapWritable = new MapWritable();
+ mapWritable.put(new Text("tag"), new Text(datalinks.get(recordName)));
+ mapWritable.put(new Text("record"), new Text(files[index].getPath().toString()));
+ return mapWritable;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return files.length == 0 ? 1 : (index + 1) / files.length;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
new file mode 100644
index 0000000..bf489db
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TavernaReducer.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+public class TavernaReducer extends
+ org.apache.hadoop.mapreduce.Reducer<int[], Map<String, Path>, Object, Object> {
+
+ private Context context;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ this.context = context;
+ }
+
+ @Override
+ protected void reduce(int[] key, Iterable<Map<String, Path>> values,
+ Context context) throws IOException, InterruptedException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
new file mode 100644
index 0000000..a23be06
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/Test.java
@@ -0,0 +1,68 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ *
+ *
+ * @author David Withers
+ */
+public class Test extends Configured implements Tool {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration configuration = getConf();
+ Job job = new Job(configuration);
+ job.setJarByClass(Test.class);
+ job.setJobName("wordcount");
+
+ job.setOutputKeyClass(int[].class);
+ job.setOutputValueClass(Map.class);
+
+ job.setMapperClass(TavernaMapper.class);
+// job.setCombinerClass(Reduce.class);
+ job.setReducerClass(TavernaReducer.class);
+
+ job.setInputFormatClass(TavernaInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ TavernaInputFormat.setInputPaths(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ boolean success = job.waitForCompletion(true);
+ return success ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new Test(), args);
+ System.exit(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
new file mode 100644
index 0000000..4c5c115
--- /dev/null
+++ b/taverna-execution-hadoop/src/main/java/org/apache/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.hadoop;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Text;
+
+public class TextArrayWritable extends ArrayWritable {
+ public TextArrayWritable() {
+ super(Text.class);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
deleted file mode 100644
index 122b473..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputFormat.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * An input format that receives an input directory containing a number of directories with input files
- * for each input port to a Taverna processor/activity that will be executed as part of this
- * MapReduce job. Mapping between directory name -> Taverna processor/activity input port name
- * is carried in the job's Context.
- *
- * @author Alex Nenadic
- *
- */
-public class CrossProductInputFormat extends
- FileInputFormat<Text, TextArrayWritable> {
-
- private static final Log Logger = LogFactory.getLog(CrossProductInputFormat.class);
-
- // Do not split files into blocks
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- return false;
- }
-
- @Override
- public RecordReader<Text, TextArrayWritable> createRecordReader(
- InputSplit split, TaskAttemptContext context) {
- return new CrossProductRecordReader();
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
-
- // Generate splits. Split is a list of directories where each directory
- // contains inputs for one input port of the Taverna processor/activity we
- // are invoking.
- // We will have only one split for cross product that will know about all
- // the files in all input directories and will generate RecordReaders
- // for every combination of files inside these directories.
-// CrossProductInputSplit split = new CrossProductInputSplit();
-
- // List the input port directories contained in the input directory passed
- // in from the command line.
- List<FileStatus> inputPortDirectories = listStatus(job);
-
- final FileSystem fs = job.getWorkingDirectory().getFileSystem(job.getConfiguration());
- Path workingDirectory = job.getWorkingDirectory();
- System.out.println("Working directory: " + workingDirectory);
- System.out.println("Adding directories to the cross product split:");
- ArrayList<Path> inputPortDirectoriesPaths = new ArrayList<Path>();
- for (FileStatus inputPortDirectory : inputPortDirectories){
- // TODO input port directories need to be ordered in the order of the
- // input ports of the Taverna processor/activity they are going into
-
- //inputPortDirectoriesPaths.add(new Text(inputPortDirectory.getPath().toString()));
- inputPortDirectoriesPaths.add(inputPortDirectory.getPath());
- System.out.println(inputPortDirectory.getPath());
-
- }
- CrossProductInputSplit split = new CrossProductInputSplit(workingDirectory, inputPortDirectoriesPaths);
-
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
- splits.add(split);
-
- return splits;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
deleted file mode 100644
index 2ff9113..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductInputSplit.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- *
- *
- * @author Alex Nenadic
- */
-public class CrossProductInputSplit extends FileSplit {
- //
- // private long length = 0;
- // private String[] hosts;
- private List<Path> inputPortDirectories;
- private Path workingDirectory;
-
- public CrossProductInputSplit() {
- super(null,0,0,null);
- inputPortDirectories = new ArrayList<Path>();
- System.out.println("Calling default constructor for cross product split");
- }
-
- public CrossProductInputSplit(Path workingDirectory, List<Path> inputPortDirectories) {
- // this.length = length;
- // this.hosts = hosts;
- super(workingDirectory, 0, 0, new String[0]);
- this.workingDirectory = workingDirectory;
- this.inputPortDirectories = inputPortDirectories;
- System.out.println("Calling non-default constructor for cross product split");
- }
-
- public void addInputPortDirectory(Path path) {
- inputPortDirectories.add(path);
- }
-
- public List<Path> getInputPortDirectories() {
- return inputPortDirectories;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Text.writeString(out, workingDirectory.toString());
- out.writeInt(inputPortDirectories.size());
- for (Path path : inputPortDirectories) {
- Text.writeString(out, path.toString());
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- workingDirectory = new Path(Text.readString(in));
- int length = in.readInt();
- for (int i = 0; i < length; i++) {
- inputPortDirectories.add(new Path(Text.readString(in)));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
deleted file mode 100644
index 6602f55..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductRecordReader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.pingel.util.CrossProduct;
-
-public class CrossProductRecordReader extends RecordReader<Text, TextArrayWritable>{
-
- private static final Log Logger = LogFactory.getLog(CrossProductRecordReader.class);
-
- // Input directories (one for each port) containing files that are used
- // as inputs to Taverna processor/activity
- private List<Path> inputPortDirectories;
-
- private CrossProduct<String> crossProduct ;
-
- private Iterator<List<String>> crossProductIterator;
-
- private List<String> currentIndexes;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
-
- System.out.println("Inside record reader's initialize");
-
- CrossProductInputSplit crossProductSplit = (CrossProductInputSplit)split;
- inputPortDirectories = crossProductSplit.getInputPortDirectories();
- System.out.println("Record reader received " + +inputPortDirectories.size() + " input port directories");
-
- List<List<String>> iterables = new ArrayList<List<String>>();
- for (int i=0; i<inputPortDirectories.size();i++ ){
-
- Path inputPortDirectory = inputPortDirectories.get(i);
- //Path inputPortDirectory = inputPortDirectories.get(i);
- FileStatus[] files = inputPortDirectory.getFileSystem(context.getConfiguration()).listStatus(inputPortDirectory);
- List<String> fileNames = new ArrayList<String>();
- for (FileStatus file : files){
- fileNames.add(file.getPath().getName());
- }
- iterables.add(fileNames);
- }
-
- crossProduct = new CrossProduct<String>(iterables);
- crossProductIterator = crossProduct.iterator();
-
- }
-
- @Override
- public boolean nextKeyValue(){
-
- boolean hasNextKey = crossProductIterator.hasNext();
- System.out.println("Has record reader next key value? " + hasNextKey);
- if (hasNextKey){
- currentIndexes = crossProductIterator.next();
- }
- return hasNextKey;
- }
-
- @Override
- public Text getCurrentKey() throws IOException, InterruptedException {
-
- StringBuffer sb = new StringBuffer();
- for (String index : currentIndexes){
- sb.append(index + ".");
- }
- // Remove last "."
- String indexesString = sb.toString();
- System.out.println("Get current key: " + indexesString);
- if (indexesString.contains(".")){
- indexesString = indexesString.substring(0, indexesString.length() - 1);
- }
- return new Text(indexesString);
- }
-
- @Override
- public TextArrayWritable getCurrentValue() {
-
- TextArrayWritable arrayWritable = new TextArrayWritable();
- Text[] array = new Text[currentIndexes.size()];
- for(int i= 0; i< currentIndexes.size(); i++){
- Path file = new Path(inputPortDirectories.get(i).toString(), currentIndexes.get(i));
- array[i] = new Text(file.toString());
- }
- arrayWritable.set(array);
- return arrayWritable;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
deleted file mode 100644
index e5dc063..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/CrossProductTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class CrossProductTest extends Configured implements Tool {
-
- public static class Map extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> {
- public void map(Text key, TextArrayWritable value, Context context) throws IOException,
- InterruptedException {
- System.out.println("Map key = " + key);
- System.out.println("Map value = " );
-
- for (int i = 0; i < value.get().length; i++){
- System.out.println(" " + value.get()[i]);
- }
-
- context.write(key, value);
- }
- }
-
- public static class Reduce extends Reducer<Text, TextArrayWritable, Text, Text> {
- public void reduce(Text key, Iterable<TextArrayWritable> values, Context context)
- throws IOException, InterruptedException {
-
- System.out.println("Reduce key = " + key);
- context.write(key, f(values));
- }
-
- private Text f(Iterable<TextArrayWritable> values) {
- StringBuilder sb = new StringBuilder();
-
- // There should be only one array
- TextArrayWritable arrayValue = values.iterator().next();
-
- for (int i = 0; i < arrayValue.get().length; i++){
- sb.append(arrayValue.get()[i] + "\nx");
- }
- String str = sb.toString();
- if (str.contains("\nx")){
- str = str.substring(0, sb.lastIndexOf("\nx") -1);
- }
- System.out.println("Result of function f(): " + str);
-
- return new Text(str);
- }
- }
-
- public int run(String[] args) throws Exception {
-
- Configuration configuration = getConf();
- configuration.set("taverna.datalinks", "A|X,B|Y");
- System.out.println(configuration);
- Job job = new Job(configuration);
- job.setJarByClass(CrossProductTest.class);
- job.setJobName("crossproduct");
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(TextArrayWritable.class);
-
- job.setMapperClass(Map.class);
-// job.setCombinerClass(Reduce.class);
- job.setReducerClass(Reduce.class);
-
- job.setInputFormatClass(CrossProductInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- System.out.println("Input dir: " + args[0]);
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.out.println("Output dir: " + args[1]);
-
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new CrossProductTest(), args);
- System.exit(ret);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
deleted file mode 100644
index 2fa38e4..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/DotProductTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class DotProductTest extends Configured implements Tool {
-
- public static class Map extends Mapper<LongWritable, MapWritable, LongWritable, MapWritable> {
- public void map(LongWritable key, MapWritable value, Context context) throws IOException,
- InterruptedException {
- System.out.println("Map key = " + key);
- System.out.println("Map value tag = " + value.get(new Text("tag")));
- System.out.println("Map value record = " + value.get(new Text("record")));
- context.write(key, value);
- }
- }
-
- public static class Reduce extends Reducer<LongWritable, MapWritable, LongWritable, Text> {
- public void reduce(LongWritable key, Iterable<MapWritable> values, Context context)
- throws IOException, InterruptedException {
-
- System.out.println("Reduce key = " + key);
- context.write(key, f(values));
- context.write(key, f(values));
- }
-
- private Text f(Iterable<MapWritable> values) {
- StringBuilder sb = new StringBuilder();
- for (MapWritable value : values) {
- System.out.println("Reduce tag = " + value.get(new Text("tag")));
- System.out.println("Reduce value = " + value.get(new Text("record")));
- sb.append(value.get(new Text("record")) + " ");
- }
- return new Text(sb.toString());
- }
- }
-
- public int run(String[] args) throws Exception {
- java.util.Map datalinks = new HashMap();
-
-
- Configuration configuration = getConf();
- configuration.set("taverna.datalinks", "A|X,B|Y");
- System.out.println(configuration);
- Job job = new Job(configuration);
- job.setJarByClass(DotProductTest.class);
- job.setJobName("dotproduct");
-
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(MapWritable.class);
-
- job.setMapperClass(Map.class);
-// job.setCombinerClass(Reduce.class);
- job.setReducerClass(Reduce.class);
-
- job.setInputFormatClass(TavernaInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new DotProductTest(), args);
- System.exit(ret);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
deleted file mode 100644
index 57b41c5..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaInputFormat extends FileInputFormat<LongWritable, MapWritable> {
-
- @Override
- public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new TavernaRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
deleted file mode 100644
index d1bf0b3..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaInputSplit.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaInputSplit extends InputSplit {
- private int[] index;
- private Map<String, Path> inputs;
- private long length;
- private String[] hosts;
-
- public TavernaInputSplit(int[] index, Map<String, Path> inputs, long length, String[] hosts) {
- this.index = index;
- this.inputs = inputs;
- this.length = length;
- this.hosts = hosts;
- }
-
- public int[] getIndex() {
- return index;
- }
-
- public Map<String, Path> getInputs() {
- return inputs;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- if (hosts == null) {
- return new String[] {};
- } else {
- return this.hosts;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
deleted file mode 100644
index fa5d1dc..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaMapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-
-public class TavernaMapper extends org.apache.hadoop.mapreduce.Mapper<int[], Map<String, Path>, Object, Object> {
-
- private org.apache.hadoop.mapreduce.Mapper.Context context;
-
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- this.context = context;
- }
-
- @Override
- protected void map(int[] key, Map<String, Path> value,
- Context context) throws IOException, InterruptedException {
-
- // Value contains a map of input ports to data values on those ports
- // (i.e. file paths to data on the input ports)
-
-
- // Get the activity and invoke it with the passed inputs per port
-//
-// String activityClassName = context.getConfiguration().get("taverna.activity.class");
-// String activityConfigurationXML = context.getConfiguration().get("taverna.activity.configuration");
-//
-// ClassLoader classLoader = this.getClass().getClassLoader();
-// Class<?> activityClass = null;
-// AbstractAsynchronousActivity<?> activity = null;
-// try {
-// activityClass = classLoader.loadClass(activityClassName);
-// activity = (AbstractAsynchronousActivity<?>) activityClass.newInstance();
-// } catch (ClassNotFoundException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// } catch (InstantiationException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// } catch (IllegalAccessException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// }
-//
-// activity.configure(activityConfigurationXML);
-// activity.executeAsynch(data, callback);
-
- System.out.println("Index: " + key);
-
- // Input port names
- Iterator<String> iterator = value.keySet().iterator();
- while(iterator.hasNext()){
- String inputPortName = iterator.next();
- // Simply read values from input files and concatenate them
- Path inputFilePath = value.get(inputPortName);
- FSDataInputStream fileInputStream = inputFilePath.getFileSystem(null).open(inputFilePath);
- //fileInputStream.
- System.out.println("Input port: " + inputPortName + ". Input value: "+ inputFilePath +".");
- }
-
- // Map of output ports to data values on those ports
- // (i.e. file paths to data on the output ports)
- Map<String, Path> outputValue = new HashMap<String, Path>();
- context.write(key, outputValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
deleted file mode 100644
index 190d91a..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- *
- *
- * @author David Withers
- */
-public class TavernaRecordReader extends RecordReader<LongWritable, MapWritable> {
-
- private FileSplit fileSplit;
- private String recordName;
- private FileStatus[] files;
- private int index = -1;
- private Map<String, String> datalinks;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- fileSplit = (FileSplit) split;
- Path path = fileSplit.getPath();
- recordName = path.getName();
- files = path.getFileSystem(context.getConfiguration()).listStatus(path);
- setDatalinks(context);
- }
-
- /**
- * @param context
- */
- private void setDatalinks(TaskAttemptContext context) {
- datalinks = new HashMap<String, String>();
- String datalinkConfig = context.getConfiguration().get("taverna.datalinks");
- if (datalinkConfig != null) {
- String[] datalinksSplit = datalinkConfig.split(",");
- for (String datalink : datalinksSplit) {
- String[] split = datalink.split("\\|");
- if (split.length == 2) {
- datalinks.put(split[0], split[1]);
- }
- }
- }
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- index++;
- return index < files.length;
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return new LongWritable(Long.valueOf(files[index].getPath().getName()));
- }
-
- @Override
- public MapWritable getCurrentValue() throws IOException, InterruptedException {
- MapWritable mapWritable = new MapWritable();
- mapWritable.put(new Text("tag"), new Text(datalinks.get(recordName)));
- mapWritable.put(new Text("record"), new Text(files[index].getPath().toString()));
- return mapWritable;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return files.length == 0 ? 1 : (index + 1) / files.length;
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
deleted file mode 100644
index 11f85dd..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TavernaReducer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-
-public class TavernaReducer extends
- org.apache.hadoop.mapreduce.Reducer<int[], Map<String, Path>, Object, Object> {
-
- private Context context;
-
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- this.context = context;
- }
-
- @Override
- protected void reduce(int[] key, Iterable<Map<String, Path>> values,
- Context context) throws IOException, InterruptedException {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
deleted file mode 100644
index 65a75d8..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/Test.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- *
- *
- * @author David Withers
- */
-public class Test extends Configured implements Tool {
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration configuration = getConf();
- Job job = new Job(configuration);
- job.setJarByClass(Test.class);
- job.setJobName("wordcount");
-
- job.setOutputKeyClass(int[].class);
- job.setOutputValueClass(Map.class);
-
- job.setMapperClass(TavernaMapper.class);
-// job.setCombinerClass(Reduce.class);
- job.setReducerClass(TavernaReducer.class);
-
- job.setInputFormatClass(TavernaInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- TavernaInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new Test(), args);
- System.exit(ret);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
----------------------------------------------------------------------
diff --git a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java b/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
deleted file mode 100644
index 1b64c77..0000000
--- a/taverna-execution-hadoop/src/main/java/uk/org/taverna/platform/execution/impl/hadoop/TextArrayWritable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.hadoop;
-
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.Text;
-
-public class TextArrayWritable extends ArrayWritable {
- public TextArrayWritable() {
- super(Text.class);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java b/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
new file mode 100644
index 0000000..d9bfbf1
--- /dev/null
+++ b/taverna-execution-impl/src/main/java/org/apache/taverna/platform/execution/impl/ExecutionEnvironmentServiceImpl.java
@@ -0,0 +1,353 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl;
+
+import java.net.URI;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.taverna.platform.capability.api.ActivityConfigurationException;
+import org.apache.taverna.platform.capability.api.ActivityNotFoundException;
+import org.apache.taverna.platform.capability.api.DispatchLayerConfigurationException;
+import org.apache.taverna.platform.capability.api.DispatchLayerNotFoundException;
+import org.apache.taverna.platform.execution.api.ExecutionEnvironment;
+import org.apache.taverna.platform.execution.api.ExecutionEnvironmentService;
+import org.apache.taverna.platform.execution.api.ExecutionService;
+import org.apache.taverna.scufl2.api.activity.Activity;
+import org.apache.taverna.scufl2.api.common.NamedSet;
+import org.apache.taverna.scufl2.api.common.Scufl2Tools;
+import org.apache.taverna.scufl2.api.configurations.Configuration;
+import org.apache.taverna.scufl2.api.core.Processor;
+import org.apache.taverna.scufl2.api.profiles.ProcessorBinding;
+import org.apache.taverna.scufl2.api.profiles.Profile;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Implementation of the ExecutionEnvironmentService.
+ *
+ * @author David Withers
+ */
+public class ExecutionEnvironmentServiceImpl implements ExecutionEnvironmentService {
+ private static final Logger logger = Logger.getLogger(ExecutionEnvironmentServiceImpl.class.getName());
+
+ @SuppressWarnings("unused")
+ private final Scufl2Tools scufl2Tools = new Scufl2Tools();
+ private Set<ExecutionService> executionServices;
+
+ @Override
+ public Set<ExecutionEnvironment> getExecutionEnvironments() {
+ Set<ExecutionEnvironment> executionEnvironments = new HashSet<>();
+ for (ExecutionService executionService : executionServices)
+ executionEnvironments.addAll(executionService
+ .getExecutionEnvironments());
+ return executionEnvironments;
+ }
+
+ @Override
+ public Set<ExecutionEnvironment> getExecutionEnvironments(Profile profile) {
+ Set<ExecutionEnvironment> validExecutionEnvironments = new HashSet<>();
+ for (ExecutionEnvironment executionEnvironment : getExecutionEnvironments())
+ if (isValidExecutionEnvironment(executionEnvironment, profile))
+ validExecutionEnvironments.add(executionEnvironment);
+ return validExecutionEnvironments;
+ }
+
+ /**
+ * Sets the ExecutionServices that will be used to find ExecutionEnvironments.
+ *
+ * @param executionServices
+ * the ExecutionServices that will be used to find ExecutionEnvironments
+ */
+ public void setExecutionServices(Set<ExecutionService> executionServices) {
+ this.executionServices = executionServices;
+ }
+
+ /**
+ * @param executionEnvironment
+ * @param profile
+ * @return
+ */
+ private boolean isValidExecutionEnvironment(ExecutionEnvironment executionEnvironment,
+ Profile profile) {
+ NamedSet<ProcessorBinding> processorBindings = profile.getProcessorBindings();
+ for (ProcessorBinding processorBinding : processorBindings) {
+ Activity activity = processorBinding.getBoundActivity();
+ if (!executionEnvironment.activityExists(activity.getType())) {
+ logger.fine(MessageFormat.format("{0} does not contain activity {1}",
+ executionEnvironment.getName(), activity.getType()));
+ return false;
+ }
+ Configuration activityConfiguration = activity.getConfiguration();
+ if (!isValidActivityConfiguration(executionEnvironment, activityConfiguration, activity)) {
+ logger.fine(MessageFormat.format("Invalid activity configuration for {1} in {0}",
+ executionEnvironment.getName(), activity.getType()));
+ return false;
+ }
+ @SuppressWarnings("unused")
+ Processor processor = processorBinding.getBoundProcessor();
+ // TODO check that environment has required dispatch layers for processor configuration
+// for (DispatchStackLayer dispatchStackLayer : processor.getDispatchStack()) {
+// if (!executionEnvironment.dispatchLayerExists(dispatchStackLayer
+// .getType())) {
+// logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}",
+// executionEnvironment.getName(),
+// dispatchStackLayer.getType()));
+// return false;
+// }
+//
+// List<Configuration> dispatchLayerConfigurations = scufl2Tools.configurationsFor(dispatchStackLayer, profile);
+// if (dispatchLayerConfigurations.size() > 1) {
+// logger.fine(MessageFormat.format("{0} contains multiple configurations for dispatch layer {1}",
+// executionEnvironment.getName(),
+// dispatchStackLayer.getType()));
+// } else if (dispatchLayerConfigurations.size() == 1) {
+// if (!isValidDispatchLayerConfiguration(executionEnvironment, dispatchLayerConfigurations.get(0), dispatchStackLayer)) {
+// logger.fine(MessageFormat.format("Invalid dispatch layer configuration for {1} in {0}",
+// executionEnvironment.getName(), dispatchStackLayer.getType()));
+// return false;
+// }
+// }
+// }
+ }
+ return true;
+ }
+
+ private boolean isValidActivityConfiguration(ExecutionEnvironment executionEnvironment,
+ Configuration configuration, Activity activity) {
+ try {
+ configuration.getJson();
+ configuration.getJsonSchema();
+ @SuppressWarnings("unused")
+ JsonNode environmentSchema = executionEnvironment.getActivityConfigurationSchema(activity.getType());
+ // TODO validate against schema
+ } catch (ActivityNotFoundException e) {
+ logger.fine(MessageFormat.format("{0} does not contain activity {1}",
+ executionEnvironment.getName(), activity.getType()));
+ return false;
+ } catch (ActivityConfigurationException e) {
+ logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}",
+ executionEnvironment.getName(), activity.getType()));
+ return false;
+ }
+ return true;
+ }
+
+ @SuppressWarnings("unused")
+ private boolean isValidDispatchLayerConfiguration(ExecutionEnvironment executionEnvironment,
+ Configuration configuration, URI dispatchLayerType) {
+ try {
+ JsonNode environmentSchema = executionEnvironment.getDispatchLayerConfigurationSchema(dispatchLayerType);
+ // TODO validate against schema
+ } catch (DispatchLayerNotFoundException e) {
+ logger.fine(MessageFormat.format("{0} does not contain dispatch layer {1}",
+ executionEnvironment.getName(), dispatchLayerType));
+ return false;
+ } catch (DispatchLayerConfigurationException e) {
+ logger.fine(MessageFormat.format("Configuration for {1} is incorrect in {0}",
+ executionEnvironment.getName(), dispatchLayerType));
+ return false;
+ }
+ return true;
+ }
+
+// /**
+// * @param propertyResourceDefinition
+// * @param propertyResource
+// * @return
+// */
+// private boolean isValidPropertyResource(Configuration configuration,
+// PropertyResourceDefinition propertyResourceDefinition, PropertyResource propertyResource) {
+// if (!propertyResourceDefinition.getTypeURI().equals(propertyResource.getTypeURI())) {
+// logger.fine(MessageFormat.format(
+// "Property type {0} does not equal property definition type {1}",
+// propertyResource.getTypeURI(), propertyResourceDefinition.getTypeURI()));
+// return false;
+// }
+// List<PropertyDefinition> propertyDefinitions = propertyResourceDefinition
+// .getPropertyDefinitions();
+// Map<URI, SortedSet<PropertyObject>> properties = propertyResource.getProperties();
+// for (PropertyDefinition propertyDefinition : propertyDefinitions) {
+// SortedSet<PropertyObject> propertySet = properties.get(propertyDefinition
+// .getPredicate());
+// if (propertySet == null) {
+// if (propertyDefinition.isRequired()) {
+// logger.fine(MessageFormat.format("Required property {0} is missing",
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// } else {
+// if (propertySet.size() == 0 && propertyDefinition.isRequired()) {
+// logger.fine(MessageFormat.format("Required property {0} is missing",
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// if (propertySet.size() > 1 && !propertyDefinition.isMultiple()) {
+// logger.fine(MessageFormat.format(
+// "{0} properties found for singleton property {1}", propertySet.size(),
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// if (propertySet.size() > 1 && propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) {
+// logger.fine(MessageFormat.format(
+// "{0} property lists found for property {1}", propertySet.size(),
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// for (PropertyObject property : propertySet) {
+// if (propertyDefinition.isMultiple() && propertyDefinition.isOrdered()) {
+// if (property instanceof PropertyList) {
+// PropertyList propertyList = (PropertyList) property;
+// for (PropertyObject propertyObject : propertyList) {
+// if (!isValidProperty(configuration, propertyDefinition, propertyObject)) {
+// logger.fine(MessageFormat.format("Property {0} is invalid",
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// }
+// }
+//
+// } else if (!isValidProperty(configuration, propertyDefinition, property)) {
+// logger.fine(MessageFormat.format("Property {0} is invalid",
+// propertyDefinition.getPredicate()));
+// return false;
+// }
+// }
+// }
+// }
+// return true;
+// }
+//
+// /**
+// * @param propertyDefinition
+// * @param property
+// * @return
+// */
+// private boolean isValidProperty(Configuration configuration,
+// PropertyDefinition propertyDefinition, PropertyObject property) {
+// if (propertyDefinition instanceof PropertyLiteralDefinition) {
+// if (property instanceof PropertyLiteral) {
+// PropertyLiteralDefinition propertyLiteralDefinition = (PropertyLiteralDefinition) propertyDefinition;
+// PropertyLiteral propertyLiteral = (PropertyLiteral) property;
+// if (!propertyLiteral.getLiteralType().equals(
+// propertyLiteralDefinition.getLiteralType())) {
+// logger.fine(MessageFormat.format(
+// "Property type {0} does not equal property definition type {1}",
+// propertyLiteral.getLiteralType(),
+// propertyLiteralDefinition.getLiteralType()));
+// return false;
+// }
+// LinkedHashSet<String> options = propertyLiteralDefinition.getOptions();
+// if (options != null && options.size() > 0) {
+// if (!options.contains(propertyLiteral.getLiteralValue())) {
+// logger.fine(MessageFormat.format("Property value {0} is not permitted",
+// propertyLiteral.getLiteralValue()));
+// return false;
+// }
+// }
+// } else {
+// logger.fine(MessageFormat.format("Expected a PropertyLiteral but got a {0}",
+// property.getClass().getSimpleName()));
+// return false;
+// }
+// } else if (propertyDefinition instanceof PropertyReferenceDefinition) {
+// if (property instanceof PropertyReference) {
+// PropertyReferenceDefinition propertyReferenceDefinition = (PropertyReferenceDefinition) propertyDefinition;
+// PropertyReference propertyReference = (PropertyReference) property;
+// LinkedHashSet<URI> options = propertyReferenceDefinition.getOptions();
+// if (options != null && options.size() > 0) {
+// if (!options.contains(propertyReference.getResourceURI())) {
+// logger.fine(MessageFormat.format("Property value {0} is not permitted",
+// propertyReference.getResourceURI()));
+// return false;
+// }
+// }
+// } else {
+// logger.fine(MessageFormat.format("Expected a PropertyReference but got a {0}",
+// property.getClass().getSimpleName()));
+// return false;
+// }
+// } else if (propertyDefinition instanceof PropertyResourceDefinition) {
+// if (property instanceof PropertyResource) {
+// PropertyResourceDefinition propertyResourceDefinition = (PropertyResourceDefinition) propertyDefinition;
+// PropertyResource propertyResource = (PropertyResource) property;
+// return isValidPropertyResource(configuration, propertyResourceDefinition,
+// propertyResource);
+// } else if (property instanceof PropertyReference) {
+// // special cases where a PropertyResource is actually a reference to a WorkflowBundle component
+// PropertyReference propertyReference = (PropertyReference) property;
+// WorkflowBundle workflowBundle = scufl2Tools.findParent(WorkflowBundle.class,
+// configuration);
+// URI configUri = uriTools.uriForBean(configuration);
+// URI referenceUri = configUri.resolve(propertyReference.getResourceURI());
+// if (workflowBundle != null) {
+// URI predicate = propertyDefinition.getPredicate();
+// WorkflowBean workflowBean = uriTools.resolveUri(referenceUri, workflowBundle);
+// if (workflowBean == null) {
+// logger.fine(MessageFormat.format(
+// "Cannot resolve {0} in WorkflowBundle {1}",
+// propertyReference.getResourceURI(), workflowBundle.getName()));
+// }
+// if (predicate.equals(SCUFL2.resolve("#definesInputPort"))) {
+// if (workflowBean == null) {
+// return false;
+// }
+// if (!(workflowBean instanceof InputActivityPort)) {
+// logger.fine(MessageFormat.format(
+// "{0} resolved to a {1}, expected a InputActivityPort",
+// propertyReference.getResourceURI(), workflowBean.getClass()
+// .getSimpleName()));
+// return false;
+// }
+// } else if (predicate.equals(SCUFL2.resolve("#definesOutputPort"))) {
+// if (workflowBean == null) {
+// return false;
+// }
+// if (!(workflowBean instanceof OutputActivityPort)) {
+// logger.fine(MessageFormat.format(
+// "{0} resolved to a {1}, expected a OutputActivityPort",
+// propertyReference.getResourceURI(), workflowBean.getClass()
+// .getSimpleName()));
+// return false;
+// }
+// } else {
+// logger.fine(MessageFormat.format("Unexpected reference to {0}", predicate));
+// }
+// } else {
+// logger.fine(MessageFormat
+// .format("Cannot resolve reference to {0} because Configuration {1} not contained within a WorkflowBundle",
+// referenceUri, configuration.getName()));
+// }
+// } else {
+// logger.fine(MessageFormat.format("Expected a PropertyResource or PropertyReference but got a {0}",
+// property.getClass().getSimpleName()));
+// return false;
+// }
+// } else {
+// logger.fine(MessageFormat.format("Unknown propery definition class {0}",
+// propertyDefinition.getClass().getSimpleName()));
+// return false;
+// }
+// return true;
+// }
+
+}