You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:52 UTC

[1/9] git commit: Add to distributed cache executable flag, directory support

Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.5.1 94d944af8 -> 67bb703d7


Add to distributed cache executable flag, directory support


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e9f42258
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e9f42258
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e9f42258

Branch: refs/heads/release-0.5.1
Commit: e9f42258ca65f5cb4918fb6eab743920e23bb87a
Parents: 94d944a
Author: zentol <s....@web.de>
Authored: Sat May 24 17:43:57 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:40 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |  5 +-
 .../java/eu/stratosphere/api/common/Plan.java   | 21 ++---
 .../api/common/cache/DistributedCache.java      | 28 +++++--
 .../api/java/ExecutionEnvironment.java          | 30 +++++--
 .../nephele/taskmanager/TaskManager.java        |  7 +-
 .../pact/runtime/cache/FileCache.java           | 84 ++++++++++++++------
 .../cache/FileCacheDeleteValidationTest.java    | 14 ++--
 .../distributedCache/DistributedCacheTest.java  |  3 +-
 8 files changed, 131 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 53b4cc1..caff8de 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -30,6 +30,7 @@ import eu.stratosphere.api.common.aggregators.AggregatorWithName;
 import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
 import eu.stratosphere.api.common.aggregators.LongSumAggregator;
 import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.compiler.CompilerException;
@@ -206,8 +207,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		}
 
 		// add registered cache file into job configuration
-		for (Entry<String, String> e: program.getOriginalPactPlan().getCachedFiles()) {
-			DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
+		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
+			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
 		}
 		JobGraph graph = this.jobGraph;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
index 6d4b356..dc3adb6 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
@@ -15,6 +15,7 @@ package eu.stratosphere.api.common;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 
 import java.util.ArrayList;
 import java.util.Calendar;
@@ -66,7 +67,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 */
 	protected int maxNumberMachines;
 
-	protected HashMap<String, String> cacheFile = new HashMap<String, String>();
+	protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap();
 
 	// ------------------------------------------------------------------------
 
@@ -301,28 +302,28 @@ public class Plan implements Visitable<Operator<?>> {
 	
 	/**
 	 *  register cache files in program level
-	 * @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS)
+	 * @param entry contains all relevant information
 	 * @param name user defined name of that file
 	 * @throws java.io.IOException
 	 */
-	public void registerCachedFile(String filePath, String name) throws IOException {
+	public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
 		if (!this.cacheFile.containsKey(name)) {
 			try {
-				URI u = new URI(filePath);
+				URI u = new URI(entry.filePath);
 				if (!u.getPath().startsWith("/")) {
-					u = new URI(new File(filePath).getAbsolutePath());
+					u = new File(entry.filePath).toURI();
 				}
 				FileSystem fs = FileSystem.get(u);
 				if (fs.exists(new Path(u.getPath()))) {
-					this.cacheFile.put(name, u.toString());
+					this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
 				} else {
-					throw new RuntimeException("File " + u.toString() + " doesn't exist.");
+					throw new IOException("File " + u.toString() + " doesn't exist.");
 				}
 			} catch (URISyntaxException ex) {
-				throw new RuntimeException("Invalid path: " + filePath, ex);
+				throw new IOException("Invalid path: " + entry.filePath, ex);
 			}
 		} else {
-			throw new RuntimeException("cache file " + name + "already exists!");
+			throw new IOException("cache file " + name + "already exists!");
 		}
 	}
 
@@ -330,7 +331,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 * return the registered caches files
 	 * @return Set of (name, filePath) pairs
 	 */
-	public Set<Entry<String,String>> getCachedFiles() {
+	public Set<Entry<String,DistributedCacheEntry>> getCachedFiles() {
 		return this.cacheFile.entrySet();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
index bd07ec8..0ddb46d 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
@@ -25,35 +25,49 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 
 /**
- * DistributedCache provides static method to write the registered cache files into job configuration or decode
+ * DistributedCache provides static methods to write the registered cache files into job configuration or decode
  * them from job configuration. It also provides user access to the file locally.
  */
 public class DistributedCache {
+	
+	public static class DistributedCacheEntry {
+		public String filePath;
+		public Boolean isExecutable;
+		
+		public DistributedCacheEntry(String filePath, Boolean isExecutable){
+			this.filePath=filePath;
+			this.isExecutable=isExecutable;
+		}
+	}
 
 	final static String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";
 
 	final static String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";
 
 	final static String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";
+	
+	final static String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";
 
 	public final static String TMP_PREFIX = "tmp_";
 
 	private Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
 
-	public static void addCachedFile(String name, String filePath, Configuration conf) {
+	public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
 		int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
 		conf.setInteger(CACHE_FILE_NUM, num);
 		conf.setString(CACHE_FILE_NAME + num, name);
-		conf.setString(CACHE_FILE_PATH + num, filePath);
+		conf.setString(CACHE_FILE_PATH + num, e.filePath);
+		conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
 	}
 
-	public static Set<Entry<String,String>> getCachedFile(Configuration conf) {
-		Map<String, String> cacheFiles = new HashMap<String, String>();
-		int num = conf.getInteger(CACHE_FILE_NUM,0);
+	public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
+		Map<String, DistributedCacheEntry> cacheFiles = new HashMap<String, DistributedCacheEntry>();
+		int num = conf.getInteger(CACHE_FILE_NUM, 0);
 		for (int i = 1; i <= num; i++) {
 			String name = conf.getString(CACHE_FILE_NAME + i, "");
 			String filePath = conf.getString(CACHE_FILE_PATH + i, "");
-			cacheFiles.put(name, filePath);
+			Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
+			cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
 		}
 		return cacheFiles.entrySet();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index 9b316cc..f04cddd 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.Validate;
 import eu.stratosphere.api.common.InvalidProgramException;
 import eu.stratosphere.api.common.JobExecutionResult;
 import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 import eu.stratosphere.api.common.io.InputFormat;
 import eu.stratosphere.api.java.io.CollectionInputFormat;
 import eu.stratosphere.api.java.io.CsvReader;
@@ -87,7 +88,7 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
 	
-	private final List<Tuple2<String, String>> cacheFile = new ArrayList<Tuple2<String, String>>();
+	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
 
 	private int degreeOfParallelism = -1;
 	
@@ -542,8 +543,8 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Registers a file at the distributed cache under the given name. The file will be accessible
 	 * from any user-defined function in the (distributed) runtime under a local path. Files
-	 * may be local files, or files in a distributed file system. The runtime will copy the files
-	 * temporarily to a local cache, if needed.
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+	 * The runtime will copy the files temporarily to a local cache, if needed.
 	 * <p>
 	 * The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
 	 * {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access 
@@ -554,7 +555,26 @@ public abstract class ExecutionEnvironment {
 	 * @param name The name under which the file is registered.
 	 */
 	public void registerCachedFile(String filePath, String name){
-		this.cacheFile.add(new Tuple2<String, String>(filePath, name));
+		registerCachedFile(filePath, name, false);
+	}
+	
+	/**
+	 * Registers a file at the distributed cache under the given name. The file will be accessible
+	 * from any user-defined function in the (distributed) runtime under a local path. Files
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. 
+	 * The runtime will copy the files temporarily to a local cache, if needed.
+	 * <p>
+	 * The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 * {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access 
+	 * {@link eu.stratosphere.api.common.cache.DistributedCache} via 
+	 * {@link eu.stratosphere.api.common.functions.RuntimeContext#getDistributedCache()}.
+	 * 
+	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+	 * @param name The name under which the file is registered.
+	 * @param executable flag indicating whether the file should be executable
+	 */
+	public void registerCachedFile(String filePath, String name, boolean executable){
+		this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
 	}
 	
 	/**
@@ -565,7 +585,7 @@ public abstract class ExecutionEnvironment {
 	 * @throws IOException Thrown if checks for existence and sanity fail.
 	 */
 	protected void registerCachedFilesWithPlan(Plan p) throws IOException {
-		for (Tuple2<String, String> entry : cacheFile) {
+		for (Tuple2<String, DistributedCacheEntry> entry : cacheFile) {
 			p.registerCachedFile(entry.f0, entry.f1);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 3b478cf..ebbdeb5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -50,6 +50,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
@@ -666,7 +667,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 			// retrieve the registered cache files from job configuration and create the local tmp file.
 			Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
-			for (Entry<String, String> e: DistributedCache.getCachedFile(tdd.getJobConfiguration())) {
+			for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
 				FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
 				cpTasks.put(e.getKey(), cp);
 			}
@@ -801,8 +802,8 @@ public class TaskManager implements TaskOperationProtocol {
 			}
 
 			// remove the local tmp file for unregistered tasks.
-			for (Entry<String, String> e: DistributedCache.getCachedFile(task.getEnvironment().getJobConfiguration())) {
-				this.fileCache.deleteTmpFile(e.getKey(), task.getJobID());
+			for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
+				this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
 			}
 			// Unregister task from the byte buffered channel manager
 			this.channelManager.unregister(id, task);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 26eb9fe..6a38cd8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -13,6 +13,8 @@
 
 package eu.stratosphere.pact.runtime.cache;
 
+import eu.stratosphere.api.common.cache.DistributedCache;
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -25,11 +27,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
-import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.core.fs.FSDataInputStream;
 import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
 import eu.stratosphere.core.fs.FileSystem;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.fs.local.LocalFileSystem;
@@ -38,9 +41,9 @@ import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
 import eu.stratosphere.nephele.util.IOUtils;
 
 /**
- * FileCache is used to create the local tmp file for the registered cache file when a task is deployed. Also when the
- * task is unregistered, it will remove the local tmp file. Given that another task from the same job may be registered
- * shortly after, there exists a 5 second delay	before clearing the local tmp file.
+ * The FileCache is used to create the local files for the registered cache files when a task is deployed. 
+ * The files will be removed when the task is unregistered after a 5 second delay.
+ * A given file x will be placed in "<system-tmp-dir>/tmp_<jobID>/".
  */
 public class FileCache {
 
@@ -52,34 +55,40 @@ public class FileCache {
 
 	/**
 	 * If the file doesn't exists locally, it will copy the file to the temp directory.
+	 * @param name file identifier
+	 * @param entry entry containing all relevant information
+	 * @param jobID
+	 * @return copy task
 	 */
-	public FutureTask<Path> createTmpFile(String name, String filePath, JobID jobID) {
-
+	public FutureTask<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
 		synchronized (count) {
-			Pair<JobID, String> key = new ImmutablePair(jobID,name);
+			Pair<JobID, String> key = new ImmutablePair(jobID, name);
 			if (count.containsKey(key)) {
 				count.put(key, count.get(key) + 1);
 			} else {
 				count.put(key, 1);
 			}
 		}
-		CopyProcess cp = new CopyProcess(name, filePath, jobID);
+		CopyProcess cp = new CopyProcess(name, entry, jobID);
 		FutureTask<Path> copyTask = new FutureTask<Path>(cp);
 		executorService.submit(copyTask);
 		return copyTask;
 	}
 
 	/**
-	 * Leave a 5 seconds delay to clear the local file.
+	 * Deletes the local file after a 5 second delay.
+	 * @param name file identifier
+	 * @param entry entry containing all relevant information
+	 * @param jobID
 	 */
-	public void deleteTmpFile(String name, JobID jobID) {
-		DeleteProcess dp = new DeleteProcess(name, jobID, count.get(new ImmutablePair(jobID,name)));
+	public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
+		DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair(jobID,name)));
 		executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
 	}
 
-	public Path getTempDir(JobID jobID, String name) {
+	public Path getTempDir(JobID jobID, String childPath) {
 		return new Path(GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "_" +  name);
+			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "/" + childPath);
 	}
 
 	public void shutdown() {
@@ -100,49 +109,72 @@ public class FileCache {
 		private JobID jobID;
 		private String name;
 		private String filePath;
+		private Boolean executable;
 
-		public CopyProcess(String name, String filePath, JobID jobID) {
+		public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) {
 			this.name = name;
-			this.filePath = filePath;
+			this.filePath = e.filePath;
+			this.executable = e.isExecutable;
 			this.jobID = jobID;
 		}
+		@Override
 		public Path call()  {
-			Path tmp = getTempDir(jobID, name);
+			Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
 			try {
-				if (!lfs.exists(tmp)) {
-					FSDataOutputStream lfsOutput = lfs.create(tmp, false);
-					Path distributedPath = new Path(filePath);
-					FileSystem fs = distributedPath.getFileSystem();
-					FSDataInputStream fsInput = fs.open(distributedPath);
-					IOUtils.copyBytes(fsInput, lfsOutput);
-				}
+				create(new Path(filePath), tmp);
 			} catch (IOException e1) {
 				throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
 			}
 			return tmp;
 		}
+		
+		private void create(Path distributedFilePath, Path localFilePath) throws IOException {
+			if (!lfs.exists(localFilePath)) {
+				FileSystem dfs = distributedFilePath.getFileSystem();
+				if (dfs.getFileStatus(distributedFilePath).isDir()) {
+					lfs.mkdirs(localFilePath);
+					FileStatus[] contents = dfs.listStatus(distributedFilePath);
+					for (FileStatus content : contents) {
+						String distPath = content.getPath().toString();
+						if (content.isDir()){
+							distPath = distPath.substring(0,distPath.length() - 1);
+						}
+						String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+						create(content.getPath(), new Path(localPath));
+					}
+				} else {
+					FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
+					FSDataInputStream fsInput = dfs.open(distributedFilePath);
+					IOUtils.copyBytes(fsInput, lfsOutput);
+					new File(localFilePath.toString()).setExecutable(executable);
+				}
+			}
+		}
 	}
+
 	/**
 	 * If no task is using this file after 5 seconds, clear it.
 	 */
 	private class DeleteProcess implements Runnable {
 		private String name;
+		private String filePath;
 		private JobID jobID;
 		private int oldCount;
 
-		public DeleteProcess(String name, JobID jobID, int c) {
+		public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) {
 			this.name = name;
+			this.filePath = e.filePath;
 			this.jobID = jobID;
 			this.oldCount = c;
 		}
-
+		@Override
 		public void run() {
 			synchronized (count) {
 				if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
 					return;
 				}
 			}
-			Path tmp = getTempDir(jobID, name);
+			Path tmp = getTempDir(jobID, "");
 			try {
 				if (lfs.exists(tmp)) {
 					lfs.delete(tmp, true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
index 40b649a..2ea9ddf 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
@@ -24,8 +24,8 @@ import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 
-import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.fs.local.LocalFileSystem;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
@@ -67,13 +67,13 @@ public class FileCacheDeleteValidationTest {
 	public void testFileReuseForNextTask() {
 		JobID jobID = new JobID();
 		String filePath = f.toURI().toString();
-		fileCache.createTmpFile("test_file", filePath, jobID);
+		fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
 		try {
 			Thread.sleep(1000);
 		} catch (InterruptedException e) {
 			throw new RuntimeException("Interrupted error", e);
 		}
-		fileCache.deleteTmpFile("test_file", jobID);
+		fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
 		try {
 			Thread.sleep(1000);
 		} catch (InterruptedException e) {
@@ -81,17 +81,17 @@ public class FileCacheDeleteValidationTest {
 		}
 		//new task comes after 1 second
 		try {
-			Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "test_file")));
+			Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "cacheFile")));
 		} catch (IOException e) {
 			throw new RuntimeException("Interrupted error", e);
 		}
-		fileCache.createTmpFile("test_file", filePath, jobID);
+		fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
 		try {
 			Thread.sleep(1000);
 		} catch (InterruptedException e) {
 			throw new RuntimeException("Interrupted error", e);
 		}
-		fileCache.deleteTmpFile("test_file", jobID);
+		fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
 		try {
 			Thread.sleep(7000);
 		} catch (InterruptedException e) {
@@ -99,7 +99,7 @@ public class FileCacheDeleteValidationTest {
 		}
 		//no task comes in 7 seconds
 		try {
-			Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "test_file")));
+			Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "cacheFile")));
 		} catch (IOException e) {
 			throw new RuntimeException("Interrupted error", e);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
index 9fdcc42..6e46f82 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
@@ -14,6 +14,7 @@
 package eu.stratosphere.test.distributedCache;
 
 import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
 import eu.stratosphere.api.java.record.operators.FileDataSink;
 import eu.stratosphere.api.java.record.operators.FileDataSource;
 import eu.stratosphere.api.java.record.functions.MapFunction;
@@ -134,7 +135,7 @@ public class DistributedCacheTest extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		Plan plan =  getPlan(1 , textPath, resultPath);
 		try {
-			plan.registerCachedFile(cachePath, "cache_test");
+			plan.registerCachedFile("cache_test", new DistributedCacheEntry(cachePath, false));
 		} catch (IOException ex) {
 			throw new RuntimeException(ex);
 		}		


[2/9] git commit: Distributed Cache general purpose copy method, thread safety

Posted by rm...@apache.org.
Distributed Cache general purpose copy method, thread safety


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd0acd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd0acd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd0acd4

Branch: refs/heads/release-0.5.1
Commit: 1dd0acd4d572b286f6afbb9562acffb3fdc20217
Parents: e9f4225
Author: zentol <s....@web.de>
Authored: Fri Jun 6 23:44:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:45 2014 +0200

----------------------------------------------------------------------
 .../pact/runtime/cache/FileCache.java           | 58 ++++++++++++--------
 1 file changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd0acd4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 6a38cd8..882c3a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -49,6 +49,8 @@ public class FileCache {
 
 	private LocalFileSystem lfs = new LocalFileSystem();
 
+	private static final Object lock = new Object();
+
 	private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>, Integer>();
 
 	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
@@ -102,6 +104,35 @@ public class FileCache {
 		}
 	}
 
+	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+		FileSystem sFS = sourcePath.getFileSystem();
+		FileSystem tFS = targetPath.getFileSystem();
+		if (!tFS.exists(targetPath)) {
+			if (sFS.getFileStatus(sourcePath).isDir()) {
+				tFS.mkdirs(targetPath);
+				FileStatus[] contents = sFS.listStatus(sourcePath);
+				for (FileStatus content : contents) {
+					String distPath = content.getPath().toString();
+					if (content.isDir()) {
+						if (distPath.endsWith("/")) {
+							distPath = distPath.substring(0, distPath.length() - 1);
+						}
+					}
+					String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+					copy(content.getPath(), new Path(localPath), executable);
+				}
+			} else {
+				try {
+					FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
+					FSDataInputStream fsInput = sFS.open(sourcePath);
+					IOUtils.copyBytes(fsInput, lfsOutput);
+					new File(targetPath.toString()).setExecutable(executable);
+				} catch (IOException ioe) {
+				}
+			}
+		}
+	}
+
 	/**
 	 * Asynchronous file copy process
 	 */
@@ -121,35 +152,14 @@ public class FileCache {
 		public Path call()  {
 			Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
 			try {
-				create(new Path(filePath), tmp);
+				synchronized (lock) {
+					copy(new Path(filePath), tmp, this.executable);
+				}
 			} catch (IOException e1) {
 				throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
 			}
 			return tmp;
 		}
-		
-		private void create(Path distributedFilePath, Path localFilePath) throws IOException {
-			if (!lfs.exists(localFilePath)) {
-				FileSystem dfs = distributedFilePath.getFileSystem();
-				if (dfs.getFileStatus(distributedFilePath).isDir()) {
-					lfs.mkdirs(localFilePath);
-					FileStatus[] contents = dfs.listStatus(distributedFilePath);
-					for (FileStatus content : contents) {
-						String distPath = content.getPath().toString();
-						if (content.isDir()){
-							distPath = distPath.substring(0,distPath.length() - 1);
-						}
-						String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
-						create(content.getPath(), new Path(localPath));
-					}
-				} else {
-					FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
-					FSDataInputStream fsInput = dfs.open(distributedFilePath);
-					IOUtils.copyBytes(fsInput, lfsOutput);
-					new File(localFilePath.toString()).setExecutable(executable);
-				}
-			}
-		}
 	}
 
 	/**


[8/9] git commit: quick and dirty fix for FLINK-927 and added more diagnostics information

Posted by rm...@apache.org.
quick and dirty fix for FLINK-927 and added more diagnostics information


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/21ecaaa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/21ecaaa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/21ecaaa5

Branch: refs/heads/release-0.5.1
Commit: 21ecaaa549fe052a5a636066d92495246bb4bdec
Parents: 54f3059
Author: rwaury <ro...@googlemail.com>
Authored: Thu Jun 12 16:18:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:14 2014 +0200

----------------------------------------------------------------------
 .../pact/runtime/hash/CompactingHashTable.java  | 36 +++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21ecaaa5/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
index 84da223..3f00493 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
@@ -337,6 +337,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	}
 	
 	public final void insert(T record) throws IOException {
+		if(this.closed.get()) {
+			return;
+		}
 		final int hashCode = hash(this.buildSideComparator.hash(record));
 		final int posHashCode = hashCode % this.numBuckets;
 		
@@ -365,12 +368,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 						" minPartition: " + getMinPartition() +
 						" maxPartition: " + getMaxPartition() +
+						" number of overflow segments: " + getOverflowSegmentCount() +
 						" bucketSize: " + this.buckets.length +
 						" Message: " + ex.getMessage());
 			} catch (IndexOutOfBoundsException ex) {
 				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 						" minPartition: " + getMinPartition() +
 						" maxPartition: " + getMaxPartition() +
+						" number of overflow segments: " + getOverflowSegmentCount() +
 						" bucketSize: " + this.buckets.length +
 						" Message: " + ex.getMessage());
 			}
@@ -383,12 +388,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 						" minPartition: " + getMinPartition() +
 						" maxPartition: " + getMaxPartition() +
+						" number of overflow segments: " + getOverflowSegmentCount() +
 						" bucketSize: " + this.buckets.length +
 						" Message: " + ex.getMessage());
 			} catch (IndexOutOfBoundsException ex) {
 				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 						" minPartition: " + getMinPartition() +
 						" maxPartition: " + getMaxPartition() +
+						" number of overflow segments: " + getOverflowSegmentCount() +
 						" bucketSize: " + this.buckets.length +
 						" Message: " + ex.getMessage());
 			}
@@ -420,6 +427,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @throws IOException
 	 */
 	public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
+		if(this.closed.get()) {
+			return;
+		}
 		final int searchHashCode = hash(this.buildSideComparator.hash(record));
 		final int posHashCode = searchHashCode % this.numBuckets;
 		
@@ -480,12 +490,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 									" minPartition: " + getMinPartition() +
 									" maxPartition: " + getMaxPartition() +
+									" number of overflow segments: " + getOverflowSegmentCount() +
 									" bucketSize: " + this.buckets.length +
 									" Message: " + ex.getMessage());
 						} catch (IndexOutOfBoundsException ex) {
 							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 									" minPartition: " + getMinPartition() +
 									" maxPartition: " + getMaxPartition() +
+									" number of overflow segments: " + getOverflowSegmentCount() +
 									" bucketSize: " + this.buckets.length +
 									" Message: " + ex.getMessage());
 						}
@@ -502,12 +514,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 									" minPartition: " + getMinPartition() +
 									" maxPartition: " + getMaxPartition() +
+									" number of overflow segments: " + getOverflowSegmentCount() +
 									" bucketSize: " + this.buckets.length +
 									" Message: " + ex.getMessage());
 						} catch (IndexOutOfBoundsException ex) {
 							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
 									" minPartition: " + getMinPartition() +
 									" maxPartition: " + getMaxPartition() +
+									" number of overflow segments: " + getOverflowSegmentCount() +
 									" bucketSize: " + this.buckets.length +
 									" Message: " + ex.getMessage());
 						}
@@ -766,8 +780,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			throw new RuntimeException("Memory ran out. numPartitions: " + this.partitions.size() + 
 													" minPartition: " + getMinPartition() +
 													" maxPartition: " + getMaxPartition() + 
+													" number of overflow segments: " + getOverflowSegmentCount() +
 													" bucketSize: " + this.buckets.length);
-			//throw new RuntimeException("The hash table ran out of memory.");
 		}
 	}
 
@@ -809,6 +823,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		return minPartition;
 	}
 	
+	private int getOverflowSegmentCount() {
+		int result = 0;
+		for(InMemoryPartition<T> p : this.partitions) {
+			result += p.numOverflowSegments;
+		}
+		return result;
+	}
+	
 	private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
 		// ----------------------------------------------------------------------------------------
 		// the following observations hold:
@@ -853,8 +875,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @throws IOException 
 	 */
 	private void compactPartition(int partitionNumber) throws IOException {
-		// stop if no garbage exists
-		if(this.partitions.get(partitionNumber).isCompacted()) {
+		// stop if no garbage exists or table is closed
+		if(this.partitions.get(partitionNumber).isCompacted() || this.closed.get()) {
 			return;
 		}
 		// release all segments owned by compaction partition
@@ -998,7 +1020,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 
 		@Override
 		public T next(T reuse) throws IOException {
-			if(done) {
+			if(done || this.table.closed.get()) {
 				return null;
 			} else if(!cache.isEmpty()) {
 				reuse = cache.remove(cache.size()-1);
@@ -1084,6 +1106,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		}
 		
 		public boolean getMatchFor(PT probeSideRecord, T targetForMatch) {
+			if(closed.get()) {
+				return false;
+			}
 			final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
 			
 			final int posHashCode = searchHashCode % numBuckets;
@@ -1154,6 +1179,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		}
 		
 		public void updateMatch(T record) throws IOException {
+			if(closed.get()) {
+				return;
+			}
 			long newPointer = this.partition.appendRecord(record);
 			this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
 			this.partition.setCompaction(false); //FIXME Do we really create garbage here?


[6/9] git commit: Allow multiple successive programs on the same execution environment.

Posted by rm...@apache.org.
Allow multiple successive programs on the same execution environment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2a165eeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2a165eeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2a165eeb

Branch: refs/heads/release-0.5.1
Commit: 2a165eeb5094915b7df81537f963136696e8cc8b
Parents: 51b793f
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 16:06:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:05 2014 +0200

----------------------------------------------------------------------
 .../api/java/ExecutionEnvironment.java          |  7 +-
 .../api/java/io/DiscardingOutputFormat.java     | 41 ++++++++++++
 .../api/java/MultipleInvokationsTest.java       | 68 ++++++++++++++++++++
 3 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index f04cddd..2f7aef3 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -88,7 +88,7 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
 	
-	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
+	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
 	private int degreeOfParallelism = -1;
 	
@@ -574,7 +574,7 @@ public abstract class ExecutionEnvironment {
 	 * @param executable flag indicating whether the file should be executable
 	 */
 	public void registerCachedFile(String filePath, String name, boolean executable){
-		this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
+		this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
 	}
 	
 	/**
@@ -635,6 +635,9 @@ public abstract class ExecutionEnvironment {
 			throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
 		}
 		
+		// clear all the sinks such that the next execution does not redo everything
+		this.sinks.clear();
+		
 		return plan;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
new file mode 100644
index 0000000..44912c9
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.io;
+
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * An output format that simply discards all elements.
+ *
+ * @param <T> The type of the elements accepted by the output format.
+ */
+public class DiscardingOutputFormat<T> implements OutputFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void configure(Configuration parameters) {}
+
+	@Override
+	public void open(int taskNumber, int numTasks) {}
+
+	@Override
+	public void writeRecord(T record) {}
+
+	@Override
+	public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
new file mode 100644
index 0000000..8159ec0
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+
+public class MultipleInvokationsTest {
+
+	@Test
+	public void testMultipleInvocationsGetPlan() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// ----------- Execution 1 ---------------
+			
+			DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
+			data.print().name("print1");
+			data.output(new DiscardingOuputFormat<String>()).name("output1");
+			
+			{
+				Plan p = env.createProgramPlan();
+				
+				assertEquals(2, p.getDataSinks().size());
+				for (GenericDataSinkBase<?> sink : p.getDataSinks()) {
+					assertTrue(sink.getName().equals("print1") || sink.getName().equals("output1"));
+					assertEquals("source1", sink.getInput().getName());
+				}
+			}
+			
+			// ----------- Execution 2 ---------------
+			
+			data.writeAsText("/some/file/path").name("textsink");
+			
+			{
+				Plan p = env.createProgramPlan();
+			
+				assertEquals(1, p.getDataSinks().size());
+				GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+				assertEquals("textsink", sink.getName());
+				assertEquals("source1", sink.getInput().getName());
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[4/9] git commit: added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test

Posted by rm...@apache.org.
added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f403970e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f403970e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f403970e

Branch: refs/heads/release-0.5.1
Commit: f403970eb737e703e85927d05ba4ac48a594310a
Parents: 387bb5d
Author: Sebastian Kunert <sk...@gmail.com>
Authored: Thu Jun 12 14:06:24 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:55 2014 +0200

----------------------------------------------------------------------
 .../compiler/operators/CoGroupDescriptor.java   |   2 +-
 .../CoGroupWithSolutionSetFirstDescriptor.java  |  22 +++-
 .../compiler/CoGroupSolutionSetFirstTest.java   | 103 +++++++++++++++++++
 3 files changed, 125 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index e2614ea..edc6c69 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -128,7 +128,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-		boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+		boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections();
 		
 		if (inputOrders == null || inputOrders.length < this.keys1.size()) {
 			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
index 90d0662..e8dd433 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -17,9 +17,14 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
 import eu.stratosphere.compiler.util.Utils;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
 
 /**
  * 
@@ -36,7 +41,22 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
 		RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
 		return Collections.singletonList(new LocalPropertiesPair(none, sort));
 	}
-	
+
+	@Override
+	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+		boolean[] inputOrders = in2.getLocalProperties().getOrdering() == null ? null : in2.getLocalProperties().getOrdering().getFieldSortDirections();
+
+		if (inputOrders == null || inputOrders.length < this.keys2.size()) {
+			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
+		} else if (inputOrders.length > this.keys2.size()) {
+			boolean[] tmp = new boolean[this.keys2.size()];
+			System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+			inputOrders = tmp;
+		}
+
+		return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+	}
+
 	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
 			LocalProperties produced1, LocalProperties produced2)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..d0fca91
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,103 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+	public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
+		}
+	}
+
+	public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testCoGroupSolutionSet() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+		DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+		DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+		result.print();
+
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = null;
+		try {
+			oPlan = compileNoStats(plan);
+		} catch(CompilerException e) {
+			Assert.fail(e.getMessage());
+		}
+
+		oPlan.accept(new Visitor<PlanNode>() {
+			@Override
+			public boolean preVisit(PlanNode visitable) {
+				System.out.println(visitable);
+				if (visitable instanceof WorksetIterationPlanNode) {
+					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+					//get the CoGroup
+					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+					Channel in1 = dpn.getInput1();
+					Channel in2 = dpn.getInput2();
+
+					Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+					Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+					Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+					return false;
+				}
+				return true;
+			}
+
+			@Override
+			public void postVisit(PlanNode visitable) {
+
+			}
+		});
+
+	}
+
+}


[7/9] git commit: Renamed Network Speed test to exclude the long benachmark from regular test cycles

Posted by rm...@apache.org.
Renamed Network Speed test to exclude the long benachmark from regular test cycles


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/54f30591
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/54f30591
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/54f30591

Branch: refs/heads/release-0.5.1
Commit: 54f305919e0d1da147d2759ee32ea8d189ca3f3d
Parents: 2a165ee
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 17:50:30 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:10 2014 +0200

----------------------------------------------------------------------
 .../test/runtime/NetworkStackNepheleITCase.java | 286 -------------------
 .../test/runtime/NetworkStackThroughput.java    | 286 +++++++++++++++++++
 2 files changed, 286 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
deleted file mode 100644
index 64026a2..0000000
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.test.runtime;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.util.LogUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class NetworkStackNepheleITCase extends RecordAPITestBase {
-
-	private static final Log LOG = LogFactory.getLog(NetworkStackNepheleITCase.class);
-
-	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
-
-	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
-
-	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
-
-	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
-
-	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
-
-	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
-
-	private static final int IS_SLOW_SLEEP_MS = 10;
-
-	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
-	// ------------------------------------------------------------------------
-
-	public NetworkStackNepheleITCase(Configuration config) {
-		super(config);
-
-		setNumTaskManager(2);
-		LogUtils.initializeDefaultConsoleLogger();
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Object[][] configParams = new Object[][]{
-				new Object[]{1, false, false, false, 4, 2},
-				new Object[]{1, true, false, false, 4, 2},
-				new Object[]{1, true, true, false, 4, 2},
-				new Object[]{1, true, false, true, 4, 2},
-				new Object[]{2, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 4, 2},
-				new Object[]{4, true, false, false, 8, 4},
-				new Object[]{4, true, false, false, 16, 8},
-		};
-
-		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
-		for (Object[] p : configParams) {
-			Configuration config = new Configuration();
-			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
-			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
-			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
-			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
-			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
-			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
-
-			configs.add(config);
-		}
-
-		return toParameterList(configs);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
-		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
-		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
-
-		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
-	}
-
-	@After
-	public void calculateThroughput() {
-		if (getJobExecutionResult() != null) {
-			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-
-			double dataVolumeMbit = dataVolumeGb * 8192.0;
-			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
-
-			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
-
-			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
-					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
-		}
-	}
-
-	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
-									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
-
-		JobGraph jobGraph = new JobGraph("Speed Test");
-
-		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
-		producer.setInputClass(SpeedTestProducer.class);
-		producer.setNumberOfSubtasks(numSubtasks);
-		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
-		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
-
-		JobTaskVertex forwarder = null;
-		if (useForwarder) {
-			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
-			forwarder.setTaskClass(SpeedTestForwarder.class);
-			forwarder.setNumberOfSubtasks(numSubtasks);
-			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		}
-
-		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
-		consumer.setOutputClass(SpeedTestConsumer.class);
-		consumer.setNumberOfSubtasks(numSubtasks);
-		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
-		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
-
-		if (useForwarder) {
-			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
-			forwarder.setVertexToShareInstancesWith(producer);
-			consumer.setVertexToShareInstancesWith(producer);
-		}
-		else {
-			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			producer.setVertexToShareInstancesWith(consumer);
-		}
-
-		return jobGraph;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class SpeedTestProducer extends AbstractGenericInputTask {
-
-		private RecordWriter<SpeedTestRecord> writer;
-
-		@Override
-		public void registerInputOutput() {
-			this.writer = new RecordWriter<SpeedTestRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			this.writer.initializeSerializers();
-
-			// Determine the amount of data to send per subtask
-			int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackNepheleITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
-
-			long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
-			long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
-			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
-					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
-					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
-
-			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-
-			int numRecords = 0;
-			SpeedTestRecord record = new SpeedTestRecord();
-			for (long i = 0; i < numRecordsToEmit; i++) {
-				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
-					Thread.sleep(IS_SLOW_SLEEP_MS);
-				}
-
-				this.writer.emit(record);
-			}
-
-			this.writer.flush();
-		}
-	}
-
-	public static class SpeedTestForwarder extends AbstractTask {
-
-		private RecordReader<SpeedTestRecord> reader;
-
-		private RecordWriter<SpeedTestRecord> writer;
-
-		@Override
-		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
-			this.writer = new RecordWriter<SpeedTestRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			this.writer.initializeSerializers();
-
-			SpeedTestRecord record;
-			while ((record = this.reader.next()) != null) {
-				this.writer.emit(record);
-			}
-
-			this.writer.flush();
-		}
-	}
-
-	public static class SpeedTestConsumer extends AbstractOutputTask {
-
-		private RecordReader<SpeedTestRecord> reader;
-
-		@Override
-		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-
-			int numRecords = 0;
-			while (this.reader.next() != null) {
-				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
-					Thread.sleep(IS_SLOW_SLEEP_MS);
-				}
-			}
-		}
-	}
-
-	public static class SpeedTestRecord implements IOReadableWritable {
-
-		private static final int RECORD_SIZE = 128;
-
-		private final byte[] buf = new byte[RECORD_SIZE];
-
-		public SpeedTestRecord() {
-			for (int i = 0; i < RECORD_SIZE; ++i) {
-				this.buf[i] = (byte) (i % 128);
-			}
-		}
-
-		@Override
-		public void write(DataOutput out) throws IOException {
-			out.write(this.buf);
-		}
-
-		@Override
-		public void read(DataInput in) throws IOException {
-			in.readFully(this.buf);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
new file mode 100644
index 0000000..fae3f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -0,0 +1,286 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.runtime;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class NetworkStackThroughput extends RecordAPITestBase {
+
+	private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
+
+	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
+
+	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
+
+	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+
+	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+
+	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
+
+	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
+
+	private static final int IS_SLOW_SLEEP_MS = 10;
+
+	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+	// ------------------------------------------------------------------------
+
+	public NetworkStackThroughput(Configuration config) {
+		super(config);
+
+		setNumTaskManager(2);
+		LogUtils.initializeDefaultConsoleLogger();
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() {
+		Object[][] configParams = new Object[][]{
+				new Object[]{1, false, false, false, 4, 2},
+				new Object[]{1, true, false, false, 4, 2},
+				new Object[]{1, true, true, false, 4, 2},
+				new Object[]{1, true, false, true, 4, 2},
+				new Object[]{2, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 8, 4},
+				new Object[]{4, true, false, false, 16, 8},
+		};
+
+		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
+		for (Object[] p : configParams) {
+			Configuration config = new Configuration();
+			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
+			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+
+			configs.add(config);
+		}
+
+		return toParameterList(configs);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected JobGraph getJobGraph() throws Exception {
+		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
+		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
+
+		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
+	}
+
+	@After
+	public void calculateThroughput() {
+		if (getJobExecutionResult() != null) {
+			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			double dataVolumeMbit = dataVolumeGb * 8192.0;
+			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+
+			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+
+			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
+					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+		}
+	}
+
+	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
+									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
+
+		JobGraph jobGraph = new JobGraph("Speed Test");
+
+		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
+		producer.setInputClass(SpeedTestProducer.class);
+		producer.setNumberOfSubtasks(numSubtasks);
+		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+
+		JobTaskVertex forwarder = null;
+		if (useForwarder) {
+			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+			forwarder.setTaskClass(SpeedTestForwarder.class);
+			forwarder.setNumberOfSubtasks(numSubtasks);
+			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		}
+
+		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+		consumer.setOutputClass(SpeedTestConsumer.class);
+		consumer.setNumberOfSubtasks(numSubtasks);
+		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+		if (useForwarder) {
+			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+
+			forwarder.setVertexToShareInstancesWith(producer);
+			consumer.setVertexToShareInstancesWith(producer);
+		}
+		else {
+			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			producer.setVertexToShareInstancesWith(consumer);
+		}
+
+		return jobGraph;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class SpeedTestProducer extends AbstractGenericInputTask {
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			// Determine the amount of data to send per subtask
+			int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughput.DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+			long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
+					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
+					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
+
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			SpeedTestRecord record = new SpeedTestRecord();
+			for (long i = 0; i < numRecordsToEmit; i++) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestForwarder extends AbstractTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			SpeedTestRecord record;
+			while ((record = this.reader.next()) != null) {
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestConsumer extends AbstractOutputTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			while (this.reader.next() != null) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+			}
+		}
+	}
+
+	public static class SpeedTestRecord implements IOReadableWritable {
+
+		private static final int RECORD_SIZE = 128;
+
+		private final byte[] buf = new byte[RECORD_SIZE];
+
+		public SpeedTestRecord() {
+			for (int i = 0; i < RECORD_SIZE; ++i) {
+				this.buf[i] = (byte) (i % 128);
+			}
+		}
+
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.write(this.buf);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			in.readFully(this.buf);
+		}
+	}
+}


[5/9] git commit: FS.get(): Add URI causing exception to message.

Posted by rm...@apache.org.
FS.get(): Add URI causing exception to message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/51b793fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/51b793fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/51b793fd

Branch: refs/heads/release-0.5.1
Commit: 51b793fdcf14cd0263563811be43c82290c337c6
Parents: f403970
Author: zentol <s....@web.de>
Authored: Thu Jun 12 13:10:40 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:00 2014 +0200

----------------------------------------------------------------------
 .../src/main/java/eu/stratosphere/core/fs/FileSystem.java      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51b793fd/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index e6d187e..007a74f 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -200,7 +200,8 @@ public abstract class FileSystem {
 				}
 				catch (URISyntaxException e) {
 					// we tried to repair it, but could not. report the scheme error
-					throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes.");
+					throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes. "
+							+ "Failed for " + uri.toString() + ".");
 				}
 			}
 
@@ -213,7 +214,8 @@ public abstract class FileSystem {
 
 			// Try to create a new file system
 			if (!FSDIRECTORY.containsKey(uri.getScheme())) {
-				throw new IOException("No file system found with scheme " + uri.getScheme());
+				throw new IOException("No file system found with scheme " + uri.getScheme()
+				+ ". Failed for " + uri.toString() + ".");
 			}
 
 			Class<? extends FileSystem> fsClass = null;


[3/9] git commit: Enable additional spargel compiler test

Posted by rm...@apache.org.
Enable additional spargel compiler test


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/387bb5d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/387bb5d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/387bb5d3

Branch: refs/heads/release-0.5.1
Commit: 387bb5d361374e1aa89bfc2d408c1d90ee1e961c
Parents: 1dd0acd
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 14:25:46 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:50 2014 +0200

----------------------------------------------------------------------
 .../java/eu/stratosphere/spargel/java/SpargelCompilerTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/387bb5d3/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java b/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
index 8b280f2..7a7d251 100644
--- a/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
+++ b/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
@@ -41,7 +41,7 @@ import eu.stratosphere.test.compiler.util.CompilerTestBase;
 
 public class SpargelCompilerTest extends CompilerTestBase {
 
-//	@Test
+	@Test
 	public void testSpargelCompiler() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


[9/9] git commit: Set version to 0.5.1

Posted by rm...@apache.org.
Set version to 0.5.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/67bb703d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/67bb703d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/67bb703d

Branch: refs/heads/release-0.5.1
Commit: 67bb703d794706b6d0a029743a0a6964e7cdd9cd
Parents: 21ecaaa
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 12 21:29:16 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 21:29:16 2014 +0200

----------------------------------------------------------------------
 pom.xml                                                          | 2 +-
 stratosphere-addons/avro/pom.xml                                 | 2 +-
 stratosphere-addons/hadoop-compatibility/pom.xml                 | 2 +-
 stratosphere-addons/hbase/pom.xml                                | 2 +-
 stratosphere-addons/jdbc/pom.xml                                 | 2 +-
 stratosphere-addons/pom.xml                                      | 2 +-
 stratosphere-addons/spargel/pom.xml                              | 2 +-
 stratosphere-addons/yarn/pom.xml                                 | 2 +-
 stratosphere-clients/pom.xml                                     | 2 +-
 stratosphere-compiler/pom.xml                                    | 2 +-
 stratosphere-core/pom.xml                                        | 2 +-
 stratosphere-dist/pom.xml                                        | 2 +-
 stratosphere-examples/pom.xml                                    | 2 +-
 stratosphere-examples/stratosphere-java-examples/pom.xml         | 2 +-
 stratosphere-examples/stratosphere-scala-examples/pom.xml        | 2 +-
 stratosphere-java/pom.xml                                        | 2 +-
 stratosphere-quickstart/pom.xml                                  | 2 +-
 stratosphere-quickstart/quickstart-java/pom.xml                  | 2 +-
 .../src/main/resources/archetype-resources/pom.xml               | 4 ++--
 stratosphere-quickstart/quickstart-scala/pom.xml                 | 2 +-
 .../src/main/resources/archetype-resources/pom.xml               | 4 ++--
 stratosphere-runtime/pom.xml                                     | 2 +-
 stratosphere-scala/pom.xml                                       | 2 +-
 stratosphere-test-utils/pom.xml                                  | 2 +-
 stratosphere-tests/pom.xml                                       | 2 +-
 tools/change-version                                             | 2 +-
 26 files changed, 28 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c784957..fbd3617 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
 
 	<groupId>eu.stratosphere</groupId>
 	<artifactId>stratosphere</artifactId>
-	<version>0.5.1-SNAPSHOT</version>
+	<version>0.5.1</version>
 
 	<name>stratosphere</name>
 	<packaging>pom</packaging>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/pom.xml b/stratosphere-addons/avro/pom.xml
index e9da4e4..1841549 100644
--- a/stratosphere-addons/avro/pom.xml
+++ b/stratosphere-addons/avro/pom.xml
@@ -8,7 +8,7 @@
 	<parent>
 		<artifactId>stratosphere-addons</artifactId>
 		<groupId>eu.stratosphere</groupId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/pom.xml b/stratosphere-addons/hadoop-compatibility/pom.xml
index 7beae22..8d5aec9 100644
--- a/stratosphere-addons/hadoop-compatibility/pom.xml
+++ b/stratosphere-addons/hadoop-compatibility/pom.xml
@@ -8,7 +8,7 @@
 	<parent>
 		<artifactId>stratosphere-addons</artifactId>
 		<groupId>eu.stratosphere</groupId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/pom.xml b/stratosphere-addons/hbase/pom.xml
index a53cc98..0f90200 100644
--- a/stratosphere-addons/hbase/pom.xml
+++ b/stratosphere-addons/hbase/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<artifactId>stratosphere-addons</artifactId>
 		<groupId>eu.stratosphere</groupId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/jdbc/pom.xml b/stratosphere-addons/jdbc/pom.xml
index 7e6dd7c..c411c55 100644
--- a/stratosphere-addons/jdbc/pom.xml
+++ b/stratosphere-addons/jdbc/pom.xml
@@ -8,7 +8,7 @@
 	<parent>
 		<artifactId>stratosphere-addons</artifactId>
 		<groupId>eu.stratosphere</groupId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/pom.xml b/stratosphere-addons/pom.xml
index 4effcdc..d6f1214 100644
--- a/stratosphere-addons/pom.xml
+++ b/stratosphere-addons/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/spargel/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/pom.xml b/stratosphere-addons/spargel/pom.xml
index 1ebe79e..bf35a14 100644
--- a/stratosphere-addons/spargel/pom.xml
+++ b/stratosphere-addons/spargel/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<artifactId>stratosphere-addons</artifactId>
 		<groupId>eu.stratosphere</groupId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/pom.xml b/stratosphere-addons/yarn/pom.xml
index 1f31a03..2da1394 100644
--- a/stratosphere-addons/yarn/pom.xml
+++ b/stratosphere-addons/yarn/pom.xml
@@ -5,7 +5,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere-addons</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-clients/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/pom.xml b/stratosphere-clients/pom.xml
index d812d3c..1562f19 100644
--- a/stratosphere-clients/pom.xml
+++ b/stratosphere-clients/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/pom.xml b/stratosphere-compiler/pom.xml
index 2de4a6a..4a727a8 100644
--- a/stratosphere-compiler/pom.xml
+++ b/stratosphere-compiler/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-core/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-core/pom.xml b/stratosphere-core/pom.xml
index 27f359d..7512969 100644
--- a/stratosphere-core/pom.xml
+++ b/stratosphere-core/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-dist/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-dist/pom.xml b/stratosphere-dist/pom.xml
index e9e3ffd..da0bf01 100644
--- a/stratosphere-dist/pom.xml
+++ b/stratosphere-dist/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/pom.xml b/stratosphere-examples/pom.xml
index aff3aca..e1b0368 100644
--- a/stratosphere-examples/pom.xml
+++ b/stratosphere-examples/pom.xml
@@ -6,7 +6,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/stratosphere-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/pom.xml b/stratosphere-examples/stratosphere-java-examples/pom.xml
index 43500e8..24302c0 100644
--- a/stratosphere-examples/stratosphere-java-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-java-examples/pom.xml
@@ -6,7 +6,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere-examples</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/stratosphere-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-scala-examples/pom.xml b/stratosphere-examples/stratosphere-scala-examples/pom.xml
index ea8a3c9..da94c3a 100644
--- a/stratosphere-examples/stratosphere-scala-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-scala-examples/pom.xml
@@ -6,7 +6,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere-examples</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-java/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-java/pom.xml b/stratosphere-java/pom.xml
index 7f1fe29..4df0ea4 100644
--- a/stratosphere-java/pom.xml
+++ b/stratosphere-java/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/pom.xml b/stratosphere-quickstart/pom.xml
index fa4a668..fa08572 100644
--- a/stratosphere-quickstart/pom.xml
+++ b/stratosphere-quickstart/pom.xml
@@ -5,7 +5,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-java/pom.xml b/stratosphere-quickstart/quickstart-java/pom.xml
index f813eb1..6da8c93 100644
--- a/stratosphere-quickstart/quickstart-java/pom.xml
+++ b/stratosphere-quickstart/quickstart-java/pom.xml
@@ -9,7 +9,7 @@
   <parent>
     <groupId>eu.stratosphere</groupId>
     <artifactId>stratosphere-quickstart</artifactId>
-    <version>0.5.1-SNAPSHOT</version>
+    <version>0.5.1</version>
     <relativePath>..</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml b/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
index 82d810c..0a51cc8 100644
--- a/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -20,12 +20,12 @@
 		<dependency>
 			<groupId>eu.stratosphere</groupId>
 			<artifactId>stratosphere-java</artifactId>
-			<version>0.5.1-SNAPSHOT</version>
+			<version>0.5.1</version>
 		</dependency>
 		<dependency>
 			<groupId>eu.stratosphere</groupId>
 			<artifactId>stratosphere-clients</artifactId>
-			<version>0.5.1-SNAPSHOT</version>
+			<version>0.5.1</version>
 		</dependency>
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-scala/pom.xml b/stratosphere-quickstart/quickstart-scala/pom.xml
index 91cf2d4..afffef1 100644
--- a/stratosphere-quickstart/quickstart-scala/pom.xml
+++ b/stratosphere-quickstart/quickstart-scala/pom.xml
@@ -9,7 +9,7 @@
   <parent>
     <groupId>eu.stratosphere</groupId>
     <artifactId>stratosphere-quickstart</artifactId>
-    <version>0.5.1-SNAPSHOT</version>
+    <version>0.5.1</version>
     <relativePath>..</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 4910746..70a17af 100644
--- a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -22,12 +22,12 @@
     <dependency>
       <groupId>eu.stratosphere</groupId>
       <artifactId>stratosphere-scala</artifactId>
-      <version>0.5.1-SNAPSHOT</version>
+      <version>0.5.1</version>
     </dependency>
     <dependency>
       <groupId>eu.stratosphere</groupId>
       <artifactId>stratosphere-clients</artifactId>
-      <version>0.5.1-SNAPSHOT</version>
+      <version>0.5.1</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml
index 13f6cfb..8d9d175 100644
--- a/stratosphere-runtime/pom.xml
+++ b/stratosphere-runtime/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-scala/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-scala/pom.xml b/stratosphere-scala/pom.xml
index c9c6eb3..f511825 100644
--- a/stratosphere-scala/pom.xml
+++ b/stratosphere-scala/pom.xml
@@ -6,7 +6,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/pom.xml b/stratosphere-test-utils/pom.xml
index 29fa208..ec3695e 100644
--- a/stratosphere-test-utils/pom.xml
+++ b/stratosphere-test-utils/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-tests/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/pom.xml b/stratosphere-tests/pom.xml
index 7dec412..04eaeba 100644
--- a/stratosphere-tests/pom.xml
+++ b/stratosphere-tests/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>eu.stratosphere</groupId>
 		<artifactId>stratosphere</artifactId>
-		<version>0.5.1-SNAPSHOT</version>
+		<version>0.5.1</version>
 		<relativePath>..</relativePath>
 	</parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/tools/change-version
----------------------------------------------------------------------
diff --git a/tools/change-version b/tools/change-version
index ef2497e..b82d2cd 100755
--- a/tools/change-version
+++ b/tools/change-version
@@ -12,4 +12,4 @@
 # specific language governing permissions and limitations under the License.
 ########################################################################################################################
 
-find .. -name 'pom.xml' -type f -exec sed -i 's#<version>0.5</version>#<version>0.5.1-SNAPSHOT</version>#' {} \;
+find .. -name 'pom.xml' -type f -exec sed -i 's#<version>0.5.1-SNAPSHOT</version>#<version>0.5.1</version>#' {} \;