You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:52 UTC
[1/9] git commit: Add to distributed cache executable flag,
directory support
Repository: incubator-flink
Updated Branches:
refs/heads/release-0.5.1 94d944af8 -> 67bb703d7
Add to distributed cache executable flag, directory support
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e9f42258
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e9f42258
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e9f42258
Branch: refs/heads/release-0.5.1
Commit: e9f42258ca65f5cb4918fb6eab743920e23bb87a
Parents: 94d944a
Author: zentol <s....@web.de>
Authored: Sat May 24 17:43:57 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:40 2014 +0200
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 5 +-
.../java/eu/stratosphere/api/common/Plan.java | 21 ++---
.../api/common/cache/DistributedCache.java | 28 +++++--
.../api/java/ExecutionEnvironment.java | 30 +++++--
.../nephele/taskmanager/TaskManager.java | 7 +-
.../pact/runtime/cache/FileCache.java | 84 ++++++++++++++------
.../cache/FileCacheDeleteValidationTest.java | 14 ++--
.../distributedCache/DistributedCacheTest.java | 3 +-
8 files changed, 131 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 53b4cc1..caff8de 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -30,6 +30,7 @@ import eu.stratosphere.api.common.aggregators.AggregatorWithName;
import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.compiler.CompilerException;
@@ -206,8 +207,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
}
// add registered cache file into job configuration
- for (Entry<String, String> e: program.getOriginalPactPlan().getCachedFiles()) {
- DistributedCache.addCachedFile(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
+ for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
+ DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
}
JobGraph graph = this.jobGraph;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
index 6d4b356..dc3adb6 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
@@ -15,6 +15,7 @@ package eu.stratosphere.api.common;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import java.util.ArrayList;
import java.util.Calendar;
@@ -66,7 +67,7 @@ public class Plan implements Visitable<Operator<?>> {
*/
protected int maxNumberMachines;
- protected HashMap<String, String> cacheFile = new HashMap<String, String>();
+ protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap();
// ------------------------------------------------------------------------
@@ -301,28 +302,28 @@ public class Plan implements Visitable<Operator<?>> {
/**
* register cache files in program level
- * @param filePath The files must be stored in a place that can be accessed from all workers (most commonly HDFS)
+ * @param entry contains all relevant information
* @param name user defined name of that file
* @throws java.io.IOException
*/
- public void registerCachedFile(String filePath, String name) throws IOException {
+ public void registerCachedFile(String name, DistributedCacheEntry entry) throws IOException {
if (!this.cacheFile.containsKey(name)) {
try {
- URI u = new URI(filePath);
+ URI u = new URI(entry.filePath);
if (!u.getPath().startsWith("/")) {
- u = new URI(new File(filePath).getAbsolutePath());
+ u = new File(entry.filePath).toURI();
}
FileSystem fs = FileSystem.get(u);
if (fs.exists(new Path(u.getPath()))) {
- this.cacheFile.put(name, u.toString());
+ this.cacheFile.put(name, new DistributedCacheEntry(u.toString(), entry.isExecutable));
} else {
- throw new RuntimeException("File " + u.toString() + " doesn't exist.");
+ throw new IOException("File " + u.toString() + " doesn't exist.");
}
} catch (URISyntaxException ex) {
- throw new RuntimeException("Invalid path: " + filePath, ex);
+ throw new IOException("Invalid path: " + entry.filePath, ex);
}
} else {
- throw new RuntimeException("cache file " + name + "already exists!");
+ throw new IOException("cache file " + name + "already exists!");
}
}
@@ -330,7 +331,7 @@ public class Plan implements Visitable<Operator<?>> {
* return the registered caches files
* @return Set of (name, filePath) pairs
*/
- public Set<Entry<String,String>> getCachedFiles() {
+ public Set<Entry<String,DistributedCacheEntry>> getCachedFiles() {
return this.cacheFile.entrySet();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
index bd07ec8..0ddb46d 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
@@ -25,35 +25,49 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
/**
- * DistributedCache provides static method to write the registered cache files into job configuration or decode
+ * DistributedCache provides static methods to write the registered cache files into job configuration or decode
* them from job configuration. It also provides user access to the file locally.
*/
public class DistributedCache {
+
+ public static class DistributedCacheEntry {
+ public String filePath;
+ public Boolean isExecutable;
+
+ public DistributedCacheEntry(String filePath, Boolean isExecutable){
+ this.filePath=filePath;
+ this.isExecutable=isExecutable;
+ }
+ }
final static String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";
final static String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";
final static String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";
+
+ final static String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";
public final static String TMP_PREFIX = "tmp_";
private Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
- public static void addCachedFile(String name, String filePath, Configuration conf) {
+ public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
conf.setInteger(CACHE_FILE_NUM, num);
conf.setString(CACHE_FILE_NAME + num, name);
- conf.setString(CACHE_FILE_PATH + num, filePath);
+ conf.setString(CACHE_FILE_PATH + num, e.filePath);
+ conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
}
- public static Set<Entry<String,String>> getCachedFile(Configuration conf) {
- Map<String, String> cacheFiles = new HashMap<String, String>();
- int num = conf.getInteger(CACHE_FILE_NUM,0);
+ public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
+ Map<String, DistributedCacheEntry> cacheFiles = new HashMap<String, DistributedCacheEntry>();
+ int num = conf.getInteger(CACHE_FILE_NUM, 0);
for (int i = 1; i <= num; i++) {
String name = conf.getString(CACHE_FILE_NAME + i, "");
String filePath = conf.getString(CACHE_FILE_PATH + i, "");
- cacheFiles.put(name, filePath);
+ Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
+ cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
}
return cacheFiles.entrySet();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index 9b316cc..f04cddd 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.Validate;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.java.io.CollectionInputFormat;
import eu.stratosphere.api.java.io.CsvReader;
@@ -87,7 +88,7 @@ public abstract class ExecutionEnvironment {
private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
- private final List<Tuple2<String, String>> cacheFile = new ArrayList<Tuple2<String, String>>();
+ private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
private int degreeOfParallelism = -1;
@@ -542,8 +543,8 @@ public abstract class ExecutionEnvironment {
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
- * may be local files, or files in a distributed file system. The runtime will copy the files
- * temporarily to a local cache, if needed.
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access
@@ -554,7 +555,26 @@ public abstract class ExecutionEnvironment {
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
- this.cacheFile.add(new Tuple2<String, String>(filePath, name));
+ registerCachedFile(filePath, name, false);
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ * <p>
+ * The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access
+ * {@link eu.stratosphere.api.common.cache.DistributedCache} via
+ * {@link eu.stratosphere.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ public void registerCachedFile(String filePath, String name, boolean executable){
+ this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
}
/**
@@ -565,7 +585,7 @@ public abstract class ExecutionEnvironment {
* @throws IOException Thrown if checks for existence and sanity fail.
*/
protected void registerCachedFilesWithPlan(Plan p) throws IOException {
- for (Tuple2<String, String> entry : cacheFile) {
+ for (Tuple2<String, DistributedCacheEntry> entry : cacheFile) {
p.registerCachedFile(entry.f0, entry.f1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 3b478cf..ebbdeb5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -50,6 +50,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
@@ -666,7 +667,7 @@ public class TaskManager implements TaskOperationProtocol {
// retrieve the registered cache files from job configuration and create the local tmp file.
Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
- for (Entry<String, String> e: DistributedCache.getCachedFile(tdd.getJobConfiguration())) {
+ for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
cpTasks.put(e.getKey(), cp);
}
@@ -801,8 +802,8 @@ public class TaskManager implements TaskOperationProtocol {
}
// remove the local tmp file for unregistered tasks.
- for (Entry<String, String> e: DistributedCache.getCachedFile(task.getEnvironment().getJobConfiguration())) {
- this.fileCache.deleteTmpFile(e.getKey(), task.getJobID());
+ for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
+ this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
}
// Unregister task from the byte buffered channel manager
this.channelManager.unregister(id, task);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 26eb9fe..6a38cd8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -13,6 +13,8 @@
package eu.stratosphere.pact.runtime.cache;
+import eu.stratosphere.api.common.cache.DistributedCache;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -25,11 +27,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import eu.stratosphere.api.common.cache.DistributedCache;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FSDataInputStream;
import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.fs.local.LocalFileSystem;
@@ -38,9 +41,9 @@ import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
import eu.stratosphere.nephele.util.IOUtils;
/**
- * FileCache is used to create the local tmp file for the registered cache file when a task is deployed. Also when the
- * task is unregistered, it will remove the local tmp file. Given that another task from the same job may be registered
- * shortly after, there exists a 5 second delay before clearing the local tmp file.
+ * The FileCache is used to create the local files for the registered cache files when a task is deployed.
+ * The files will be removed when the task is unregistered after a 5 second delay.
+ * A given file x will be placed in "<system-tmp-dir>/tmp_<jobID>/".
*/
public class FileCache {
@@ -52,34 +55,40 @@ public class FileCache {
/**
* If the file doesn't exists locally, it will copy the file to the temp directory.
+ * @param name file identifier
+ * @param entry entry containing all relevant information
+ * @param jobID
+ * @return copy task
*/
- public FutureTask<Path> createTmpFile(String name, String filePath, JobID jobID) {
-
+ public FutureTask<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
synchronized (count) {
- Pair<JobID, String> key = new ImmutablePair(jobID,name);
+ Pair<JobID, String> key = new ImmutablePair(jobID, name);
if (count.containsKey(key)) {
count.put(key, count.get(key) + 1);
} else {
count.put(key, 1);
}
}
- CopyProcess cp = new CopyProcess(name, filePath, jobID);
+ CopyProcess cp = new CopyProcess(name, entry, jobID);
FutureTask<Path> copyTask = new FutureTask<Path>(cp);
executorService.submit(copyTask);
return copyTask;
}
/**
- * Leave a 5 seconds delay to clear the local file.
+ * Deletes the local file after a 5 second delay.
+ * @param name file identifier
+ * @param entry entry containing all relevant information
+ * @param jobID
*/
- public void deleteTmpFile(String name, JobID jobID) {
- DeleteProcess dp = new DeleteProcess(name, jobID, count.get(new ImmutablePair(jobID,name)));
+ public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
+ DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair(jobID,name)));
executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
}
- public Path getTempDir(JobID jobID, String name) {
+ public Path getTempDir(JobID jobID, String childPath) {
return new Path(GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "_" + name);
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), DistributedCache.TMP_PREFIX + jobID.toString() + "/" + childPath);
}
public void shutdown() {
@@ -100,49 +109,72 @@ public class FileCache {
private JobID jobID;
private String name;
private String filePath;
+ private Boolean executable;
- public CopyProcess(String name, String filePath, JobID jobID) {
+ public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) {
this.name = name;
- this.filePath = filePath;
+ this.filePath = e.filePath;
+ this.executable = e.isExecutable;
this.jobID = jobID;
}
+ @Override
public Path call() {
- Path tmp = getTempDir(jobID, name);
+ Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
try {
- if (!lfs.exists(tmp)) {
- FSDataOutputStream lfsOutput = lfs.create(tmp, false);
- Path distributedPath = new Path(filePath);
- FileSystem fs = distributedPath.getFileSystem();
- FSDataInputStream fsInput = fs.open(distributedPath);
- IOUtils.copyBytes(fsInput, lfsOutput);
- }
+ create(new Path(filePath), tmp);
} catch (IOException e1) {
throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
}
return tmp;
}
+
+ private void create(Path distributedFilePath, Path localFilePath) throws IOException {
+ if (!lfs.exists(localFilePath)) {
+ FileSystem dfs = distributedFilePath.getFileSystem();
+ if (dfs.getFileStatus(distributedFilePath).isDir()) {
+ lfs.mkdirs(localFilePath);
+ FileStatus[] contents = dfs.listStatus(distributedFilePath);
+ for (FileStatus content : contents) {
+ String distPath = content.getPath().toString();
+ if (content.isDir()){
+ distPath = distPath.substring(0,distPath.length() - 1);
+ }
+ String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+ create(content.getPath(), new Path(localPath));
+ }
+ } else {
+ FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
+ FSDataInputStream fsInput = dfs.open(distributedFilePath);
+ IOUtils.copyBytes(fsInput, lfsOutput);
+ new File(localFilePath.toString()).setExecutable(executable);
+ }
+ }
+ }
}
+
/**
* If no task is using this file after 5 seconds, clear it.
*/
private class DeleteProcess implements Runnable {
private String name;
+ private String filePath;
private JobID jobID;
private int oldCount;
- public DeleteProcess(String name, JobID jobID, int c) {
+ public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) {
this.name = name;
+ this.filePath = e.filePath;
this.jobID = jobID;
this.oldCount = c;
}
-
+ @Override
public void run() {
synchronized (count) {
if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
return;
}
}
- Path tmp = getTempDir(jobID, name);
+ Path tmp = getTempDir(jobID, "");
try {
if (lfs.exists(tmp)) {
lfs.delete(tmp, true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
index 40b649a..2ea9ddf 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
@@ -24,8 +24,8 @@ import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
-import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.fs.local.LocalFileSystem;
import eu.stratosphere.nephele.jobgraph.JobID;
@@ -67,13 +67,13 @@ public class FileCacheDeleteValidationTest {
public void testFileReuseForNextTask() {
JobID jobID = new JobID();
String filePath = f.toURI().toString();
- fileCache.createTmpFile("test_file", filePath, jobID);
+ fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted error", e);
}
- fileCache.deleteTmpFile("test_file", jobID);
+ fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -81,17 +81,17 @@ public class FileCacheDeleteValidationTest {
}
//new task comes after 1 second
try {
- Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "test_file")));
+ Assert.assertTrue("Local cache file should not be deleted when another task comes in 5 seconds!", lfs.exists(fileCache.getTempDir(jobID, "cacheFile")));
} catch (IOException e) {
throw new RuntimeException("Interrupted error", e);
}
- fileCache.createTmpFile("test_file", filePath, jobID);
+ fileCache.createTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted error", e);
}
- fileCache.deleteTmpFile("test_file", jobID);
+ fileCache.deleteTmpFile("test_file", new DistributedCacheEntry(filePath, false), jobID);
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
@@ -99,7 +99,7 @@ public class FileCacheDeleteValidationTest {
}
//no task comes in 7 seconds
try {
- Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "test_file")));
+ Assert.assertTrue("Local cache file should be deleted when no task comes in 5 seconds!", !lfs.exists(fileCache.getTempDir(jobID, "cacheFile")));
} catch (IOException e) {
throw new RuntimeException("Interrupted error", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e9f42258/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
index 9fdcc42..6e46f82 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
@@ -14,6 +14,7 @@
package eu.stratosphere.test.distributedCache;
import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.FileDataSource;
import eu.stratosphere.api.java.record.functions.MapFunction;
@@ -134,7 +135,7 @@ public class DistributedCacheTest extends RecordAPITestBase {
protected Plan getTestJob() {
Plan plan = getPlan(1 , textPath, resultPath);
try {
- plan.registerCachedFile(cachePath, "cache_test");
+ plan.registerCachedFile("cache_test", new DistributedCacheEntry(cachePath, false));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
[2/9] git commit: Distributed Cache general purpose copy method,
thread safety
Posted by rm...@apache.org.
Distributed Cache general purpose copy method, thread safety
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd0acd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd0acd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd0acd4
Branch: refs/heads/release-0.5.1
Commit: 1dd0acd4d572b286f6afbb9562acffb3fdc20217
Parents: e9f4225
Author: zentol <s....@web.de>
Authored: Fri Jun 6 23:44:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:45 2014 +0200
----------------------------------------------------------------------
.../pact/runtime/cache/FileCache.java | 58 ++++++++++++--------
1 file changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd0acd4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 6a38cd8..882c3a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -49,6 +49,8 @@ public class FileCache {
private LocalFileSystem lfs = new LocalFileSystem();
+ private static final Object lock = new Object();
+
private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>, Integer>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
@@ -102,6 +104,35 @@ public class FileCache {
}
}
+ public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+ FileSystem sFS = sourcePath.getFileSystem();
+ FileSystem tFS = targetPath.getFileSystem();
+ if (!tFS.exists(targetPath)) {
+ if (sFS.getFileStatus(sourcePath).isDir()) {
+ tFS.mkdirs(targetPath);
+ FileStatus[] contents = sFS.listStatus(sourcePath);
+ for (FileStatus content : contents) {
+ String distPath = content.getPath().toString();
+ if (content.isDir()) {
+ if (distPath.endsWith("/")) {
+ distPath = distPath.substring(0, distPath.length() - 1);
+ }
+ }
+ String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+ copy(content.getPath(), new Path(localPath), executable);
+ }
+ } else {
+ try {
+ FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
+ FSDataInputStream fsInput = sFS.open(sourcePath);
+ IOUtils.copyBytes(fsInput, lfsOutput);
+ new File(targetPath.toString()).setExecutable(executable);
+ } catch (IOException ioe) {
+ }
+ }
+ }
+ }
+
/**
* Asynchronous file copy process
*/
@@ -121,35 +152,14 @@ public class FileCache {
public Path call() {
Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
try {
- create(new Path(filePath), tmp);
+ synchronized (lock) {
+ copy(new Path(filePath), tmp, this.executable);
+ }
} catch (IOException e1) {
throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
}
return tmp;
}
-
- private void create(Path distributedFilePath, Path localFilePath) throws IOException {
- if (!lfs.exists(localFilePath)) {
- FileSystem dfs = distributedFilePath.getFileSystem();
- if (dfs.getFileStatus(distributedFilePath).isDir()) {
- lfs.mkdirs(localFilePath);
- FileStatus[] contents = dfs.listStatus(distributedFilePath);
- for (FileStatus content : contents) {
- String distPath = content.getPath().toString();
- if (content.isDir()){
- distPath = distPath.substring(0,distPath.length() - 1);
- }
- String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
- create(content.getPath(), new Path(localPath));
- }
- } else {
- FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
- FSDataInputStream fsInput = dfs.open(distributedFilePath);
- IOUtils.copyBytes(fsInput, lfsOutput);
- new File(localFilePath.toString()).setExecutable(executable);
- }
- }
- }
}
/**
[8/9] git commit: quick and dirty fix for FLINK-927 and added more
diagnostics information
Posted by rm...@apache.org.
quick and dirty fix for FLINK-927 and added more diagnostics information
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/21ecaaa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/21ecaaa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/21ecaaa5
Branch: refs/heads/release-0.5.1
Commit: 21ecaaa549fe052a5a636066d92495246bb4bdec
Parents: 54f3059
Author: rwaury <ro...@googlemail.com>
Authored: Thu Jun 12 16:18:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:14 2014 +0200
----------------------------------------------------------------------
.../pact/runtime/hash/CompactingHashTable.java | 36 +++++++++++++++++---
1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21ecaaa5/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
index 84da223..3f00493 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/CompactingHashTable.java
@@ -337,6 +337,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public final void insert(T record) throws IOException {
+ if(this.closed.get()) {
+ return;
+ }
final int hashCode = hash(this.buildSideComparator.hash(record));
final int posHashCode = hashCode % this.numBuckets;
@@ -365,12 +368,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
}
@@ -383,12 +388,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
}
@@ -420,6 +427,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @throws IOException
*/
public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
+ if(this.closed.get()) {
+ return;
+ }
final int searchHashCode = hash(this.buildSideComparator.hash(record));
final int posHashCode = searchHashCode % this.numBuckets;
@@ -480,12 +490,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
}
@@ -502,12 +514,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
" Message: " + ex.getMessage());
}
@@ -766,8 +780,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new RuntimeException("Memory ran out. numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length);
- //throw new RuntimeException("The hash table ran out of memory.");
}
}
@@ -809,6 +823,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
return minPartition;
}
+ private int getOverflowSegmentCount() {
+ int result = 0;
+ for(InMemoryPartition<T> p : this.partitions) {
+ result += p.numOverflowSegments;
+ }
+ return result;
+ }
+
private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
// ----------------------------------------------------------------------------------------
// the following observations hold:
@@ -853,8 +875,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @throws IOException
*/
private void compactPartition(int partitionNumber) throws IOException {
- // stop if no garbage exists
- if(this.partitions.get(partitionNumber).isCompacted()) {
+ // stop if no garbage exists or table is closed
+ if(this.partitions.get(partitionNumber).isCompacted() || this.closed.get()) {
return;
}
// release all segments owned by compaction partition
@@ -998,7 +1020,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
@Override
public T next(T reuse) throws IOException {
- if(done) {
+ if(done || this.table.closed.get()) {
return null;
} else if(!cache.isEmpty()) {
reuse = cache.remove(cache.size()-1);
@@ -1084,6 +1106,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public boolean getMatchFor(PT probeSideRecord, T targetForMatch) {
+ if(closed.get()) {
+ return false;
+ }
final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
final int posHashCode = searchHashCode % numBuckets;
@@ -1154,6 +1179,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public void updateMatch(T record) throws IOException {
+ if(closed.get()) {
+ return;
+ }
long newPointer = this.partition.appendRecord(record);
this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
this.partition.setCompaction(false); //FIXME Do we really create garbage here?
[6/9] git commit: Allow multiple successive programs on the same
execution environment.
Posted by rm...@apache.org.
Allow multiple successive programs on the same execution environment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2a165eeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2a165eeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2a165eeb
Branch: refs/heads/release-0.5.1
Commit: 2a165eeb5094915b7df81537f963136696e8cc8b
Parents: 51b793f
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 16:06:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:05 2014 +0200
----------------------------------------------------------------------
.../api/java/ExecutionEnvironment.java | 7 +-
.../api/java/io/DiscardingOutputFormat.java | 41 ++++++++++++
.../api/java/MultipleInvokationsTest.java | 68 ++++++++++++++++++++
3 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index f04cddd..2f7aef3 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -88,7 +88,7 @@ public abstract class ExecutionEnvironment {
private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
- private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
+ private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
private int degreeOfParallelism = -1;
@@ -574,7 +574,7 @@ public abstract class ExecutionEnvironment {
* @param executable flag indicating whether the file should be executable
*/
public void registerCachedFile(String filePath, String name, boolean executable){
- this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
+ this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
}
/**
@@ -635,6 +635,9 @@ public abstract class ExecutionEnvironment {
throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
}
+ // clear all the sinks such that the next execution does not redo everything
+ this.sinks.clear();
+
return plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
new file mode 100644
index 0000000..44912c9
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.io;
+
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * An output format that simply discards all elements.
+ *
+ * @param <T> The type of the elements accepted by the output format.
+ */
+public class DiscardingOutputFormat<T> implements OutputFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) {}
+
+ @Override
+ public void writeRecord(T record) {}
+
+ @Override
+ public void close() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
new file mode 100644
index 0000000..8159ec0
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+
+public class MultipleInvokationsTest {
+
+ @Test
+ public void testMultipleInvocationsGetPlan() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // ----------- Execution 1 ---------------
+
+ DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
+ data.print().name("print1");
+ data.output(new DiscardingOuputFormat<String>()).name("output1");
+
+ {
+ Plan p = env.createProgramPlan();
+
+ assertEquals(2, p.getDataSinks().size());
+ for (GenericDataSinkBase<?> sink : p.getDataSinks()) {
+ assertTrue(sink.getName().equals("print1") || sink.getName().equals("output1"));
+ assertEquals("source1", sink.getInput().getName());
+ }
+ }
+
+ // ----------- Execution 2 ---------------
+
+ data.writeAsText("/some/file/path").name("textsink");
+
+ {
+ Plan p = env.createProgramPlan();
+
+ assertEquals(1, p.getDataSinks().size());
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ assertEquals("textsink", sink.getName());
+ assertEquals("source1", sink.getInput().getName());
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
[4/9] git commit: added own instantiate method to
CoGroupDescriptorWithSolutionSetFirst; added Test
Posted by rm...@apache.org.
added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f403970e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f403970e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f403970e
Branch: refs/heads/release-0.5.1
Commit: f403970eb737e703e85927d05ba4ac48a594310a
Parents: 387bb5d
Author: Sebastian Kunert <sk...@gmail.com>
Authored: Thu Jun 12 14:06:24 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:55 2014 +0200
----------------------------------------------------------------------
.../compiler/operators/CoGroupDescriptor.java | 2 +-
.../CoGroupWithSolutionSetFirstDescriptor.java | 22 +++-
.../compiler/CoGroupSolutionSetFirstTest.java | 103 +++++++++++++++++++
3 files changed, 125 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index e2614ea..edc6c69 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -128,7 +128,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+ boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections();
if (inputOrders == null || inputOrders.length < this.keys1.size()) {
throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
index 90d0662..e8dd433 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -17,9 +17,14 @@ import java.util.Collections;
import java.util.List;
import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.dag.TwoInputNode;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.util.Utils;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
/**
*
@@ -36,7 +41,22 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
return Collections.singletonList(new LocalPropertiesPair(none, sort));
}
-
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ boolean[] inputOrders = in2.getLocalProperties().getOrdering() == null ? null : in2.getLocalProperties().getOrdering().getFieldSortDirections();
+
+ if (inputOrders == null || inputOrders.length < this.keys2.size()) {
+ throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
+ } else if (inputOrders.length > this.keys2.size()) {
+ boolean[] tmp = new boolean[this.keys2.size()];
+ System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+ inputOrders = tmp;
+ }
+
+ return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+ }
+
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..d0fca91
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,103 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+ public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
+ }
+ }
+
+ public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testCoGroupSolutionSet() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+ DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+ DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+ DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+ result.print();
+
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = null;
+ try {
+ oPlan = compileNoStats(plan);
+ } catch(CompilerException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ oPlan.accept(new Visitor<PlanNode>() {
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ System.out.println(visitable);
+ if (visitable instanceof WorksetIterationPlanNode) {
+ PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+ //get the CoGroup
+ DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+ Channel in1 = dpn.getInput1();
+ Channel in2 = dpn.getInput2();
+
+ Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+ Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+ Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+
+ }
+ });
+
+ }
+
+}
[7/9] git commit: Renamed Network Speed test to exclude the long
benachmark from regular test cycles
Posted by rm...@apache.org.
Renamed Network Speed test to exclude the long benachmark from regular test cycles
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/54f30591
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/54f30591
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/54f30591
Branch: refs/heads/release-0.5.1
Commit: 54f305919e0d1da147d2759ee32ea8d189ca3f3d
Parents: 2a165ee
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 17:50:30 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:10 2014 +0200
----------------------------------------------------------------------
.../test/runtime/NetworkStackNepheleITCase.java | 286 -------------------
.../test/runtime/NetworkStackThroughput.java | 286 +++++++++++++++++++
2 files changed, 286 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
deleted file mode 100644
index 64026a2..0000000
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.test.runtime;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.util.LogUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class NetworkStackNepheleITCase extends RecordAPITestBase {
-
- private static final Log LOG = LogFactory.getLog(NetworkStackNepheleITCase.class);
-
- private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
-
- private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
-
- private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
-
- private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
-
- private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
-
- private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
-
- private static final int IS_SLOW_SLEEP_MS = 10;
-
- private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
- // ------------------------------------------------------------------------
-
- public NetworkStackNepheleITCase(Configuration config) {
- super(config);
-
- setNumTaskManager(2);
- LogUtils.initializeDefaultConsoleLogger();
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() {
- Object[][] configParams = new Object[][]{
- new Object[]{1, false, false, false, 4, 2},
- new Object[]{1, true, false, false, 4, 2},
- new Object[]{1, true, true, false, 4, 2},
- new Object[]{1, true, false, true, 4, 2},
- new Object[]{2, true, false, false, 4, 2},
- new Object[]{4, true, false, false, 4, 2},
- new Object[]{4, true, false, false, 8, 4},
- new Object[]{4, true, false, false, 16, 8},
- };
-
- List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
- for (Object[] p : configParams) {
- Configuration config = new Configuration();
- config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
- config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
- config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
- config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
- config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
- config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
-
- configs.add(config);
- }
-
- return toParameterList(configs);
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
- boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
- boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
- boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
- int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
- int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
-
- return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
- }
-
- @After
- public void calculateThroughput() {
- if (getJobExecutionResult() != null) {
- int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-
- double dataVolumeMbit = dataVolumeGb * 8192.0;
- double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
-
- int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
-
- LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
- "runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
- }
- }
-
- private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
- int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
-
- JobGraph jobGraph = new JobGraph("Speed Test");
-
- JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
- producer.setInputClass(SpeedTestProducer.class);
- producer.setNumberOfSubtasks(numSubtasks);
- producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
- producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
- producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
-
- JobTaskVertex forwarder = null;
- if (useForwarder) {
- forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
- forwarder.setTaskClass(SpeedTestForwarder.class);
- forwarder.setNumberOfSubtasks(numSubtasks);
- forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
- }
-
- JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
- consumer.setOutputClass(SpeedTestConsumer.class);
- consumer.setNumberOfSubtasks(numSubtasks);
- consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
- consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
-
- if (useForwarder) {
- producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
- forwarder.setVertexToShareInstancesWith(producer);
- consumer.setVertexToShareInstancesWith(producer);
- }
- else {
- producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- producer.setVertexToShareInstancesWith(consumer);
- }
-
- return jobGraph;
- }
-
- // ------------------------------------------------------------------------
-
- public static class SpeedTestProducer extends AbstractGenericInputTask {
-
- private RecordWriter<SpeedTestRecord> writer;
-
- @Override
- public void registerInputOutput() {
- this.writer = new RecordWriter<SpeedTestRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- this.writer.initializeSerializers();
-
- // Determine the amount of data to send per subtask
- int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackNepheleITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
-
- long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
- long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
-
- LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
- getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
- SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
-
- boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-
- int numRecords = 0;
- SpeedTestRecord record = new SpeedTestRecord();
- for (long i = 0; i < numRecordsToEmit; i++) {
- if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
- Thread.sleep(IS_SLOW_SLEEP_MS);
- }
-
- this.writer.emit(record);
- }
-
- this.writer.flush();
- }
- }
-
- public static class SpeedTestForwarder extends AbstractTask {
-
- private RecordReader<SpeedTestRecord> reader;
-
- private RecordWriter<SpeedTestRecord> writer;
-
- @Override
- public void registerInputOutput() {
- this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
- this.writer = new RecordWriter<SpeedTestRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- this.writer.initializeSerializers();
-
- SpeedTestRecord record;
- while ((record = this.reader.next()) != null) {
- this.writer.emit(record);
- }
-
- this.writer.flush();
- }
- }
-
- public static class SpeedTestConsumer extends AbstractOutputTask {
-
- private RecordReader<SpeedTestRecord> reader;
-
- @Override
- public void registerInputOutput() {
- this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
- boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-
- int numRecords = 0;
- while (this.reader.next() != null) {
- if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
- Thread.sleep(IS_SLOW_SLEEP_MS);
- }
- }
- }
- }
-
- public static class SpeedTestRecord implements IOReadableWritable {
-
- private static final int RECORD_SIZE = 128;
-
- private final byte[] buf = new byte[RECORD_SIZE];
-
- public SpeedTestRecord() {
- for (int i = 0; i < RECORD_SIZE; ++i) {
- this.buf[i] = (byte) (i % 128);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.write(this.buf);
- }
-
- @Override
- public void read(DataInput in) throws IOException {
- in.readFully(this.buf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/54f30591/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
new file mode 100644
index 0000000..fae3f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -0,0 +1,286 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.runtime;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class NetworkStackThroughput extends RecordAPITestBase {
+
+ private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
+
+ private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
+
+ private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
+
+ private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+
+ private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+
+ private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
+
+ private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
+
+ private static final int IS_SLOW_SLEEP_MS = 10;
+
+ private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+ // ------------------------------------------------------------------------
+
+ public NetworkStackThroughput(Configuration config) {
+ super(config);
+
+ setNumTaskManager(2);
+ LogUtils.initializeDefaultConsoleLogger();
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() {
+ Object[][] configParams = new Object[][]{
+ new Object[]{1, false, false, false, 4, 2},
+ new Object[]{1, true, false, false, 4, 2},
+ new Object[]{1, true, true, false, 4, 2},
+ new Object[]{1, true, false, true, 4, 2},
+ new Object[]{2, true, false, false, 4, 2},
+ new Object[]{4, true, false, false, 4, 2},
+ new Object[]{4, true, false, false, 8, 4},
+ new Object[]{4, true, false, false, 16, 8},
+ };
+
+ List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
+ for (Object[] p : configParams) {
+ Configuration config = new Configuration();
+ config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+ config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+ config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+ config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+ config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
+ config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+
+ configs.add(config);
+ }
+
+ return toParameterList(configs);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected JobGraph getJobGraph() throws Exception {
+ int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+ boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+ boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+ boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+ int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
+ int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
+
+ return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
+ }
+
+ @After
+ public void calculateThroughput() {
+ if (getJobExecutionResult() != null) {
+ int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+ double dataVolumeMbit = dataVolumeGb * 8192.0;
+ double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+
+ int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+
+ LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
+ "runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+ }
+ }
+
+ private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
+ int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
+
+ JobGraph jobGraph = new JobGraph("Speed Test");
+
+ JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
+ producer.setInputClass(SpeedTestProducer.class);
+ producer.setNumberOfSubtasks(numSubtasks);
+ producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+ producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+
+ JobTaskVertex forwarder = null;
+ if (useForwarder) {
+ forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+ forwarder.setTaskClass(SpeedTestForwarder.class);
+ forwarder.setNumberOfSubtasks(numSubtasks);
+ forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ }
+
+ JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+ consumer.setOutputClass(SpeedTestConsumer.class);
+ consumer.setNumberOfSubtasks(numSubtasks);
+ consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+ if (useForwarder) {
+ producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+ forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+
+ forwarder.setVertexToShareInstancesWith(producer);
+ consumer.setVertexToShareInstancesWith(producer);
+ }
+ else {
+ producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+ producer.setVertexToShareInstancesWith(consumer);
+ }
+
+ return jobGraph;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class SpeedTestProducer extends AbstractGenericInputTask {
+
+ private RecordWriter<SpeedTestRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ this.writer = new RecordWriter<SpeedTestRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ this.writer.initializeSerializers();
+
+ // Determine the amount of data to send per subtask
+ int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughput.DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+ long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+ long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+ LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
+ getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
+ SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
+
+ boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+
+ int numRecords = 0;
+ SpeedTestRecord record = new SpeedTestRecord();
+ for (long i = 0; i < numRecordsToEmit; i++) {
+ if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+ Thread.sleep(IS_SLOW_SLEEP_MS);
+ }
+
+ this.writer.emit(record);
+ }
+
+ this.writer.flush();
+ }
+ }
+
+ public static class SpeedTestForwarder extends AbstractTask {
+
+ private RecordReader<SpeedTestRecord> reader;
+
+ private RecordWriter<SpeedTestRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+ this.writer = new RecordWriter<SpeedTestRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ this.writer.initializeSerializers();
+
+ SpeedTestRecord record;
+ while ((record = this.reader.next()) != null) {
+ this.writer.emit(record);
+ }
+
+ this.writer.flush();
+ }
+ }
+
+ public static class SpeedTestConsumer extends AbstractOutputTask {
+
+ private RecordReader<SpeedTestRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+
+ int numRecords = 0;
+ while (this.reader.next() != null) {
+ if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+ Thread.sleep(IS_SLOW_SLEEP_MS);
+ }
+ }
+ }
+ }
+
+ public static class SpeedTestRecord implements IOReadableWritable {
+
+ private static final int RECORD_SIZE = 128;
+
+ private final byte[] buf = new byte[RECORD_SIZE];
+
+ public SpeedTestRecord() {
+ for (int i = 0; i < RECORD_SIZE; ++i) {
+ this.buf[i] = (byte) (i % 128);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(this.buf);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ in.readFully(this.buf);
+ }
+ }
+}
[5/9] git commit: FS.get(): Add URI causing exception to message.
Posted by rm...@apache.org.
FS.get(): Add URI causing exception to message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/51b793fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/51b793fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/51b793fd
Branch: refs/heads/release-0.5.1
Commit: 51b793fdcf14cd0263563811be43c82290c337c6
Parents: f403970
Author: zentol <s....@web.de>
Authored: Thu Jun 12 13:10:40 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:00 2014 +0200
----------------------------------------------------------------------
.../src/main/java/eu/stratosphere/core/fs/FileSystem.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51b793fd/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index e6d187e..007a74f 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -200,7 +200,8 @@ public abstract class FileSystem {
}
catch (URISyntaxException e) {
// we tried to repair it, but could not. report the scheme error
- throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes.");
+ throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes. "
+ + "Failed for " + uri.toString() + ".");
}
}
@@ -213,7 +214,8 @@ public abstract class FileSystem {
// Try to create a new file system
if (!FSDIRECTORY.containsKey(uri.getScheme())) {
- throw new IOException("No file system found with scheme " + uri.getScheme());
+ throw new IOException("No file system found with scheme " + uri.getScheme()
+ + ". Failed for " + uri.toString() + ".");
}
Class<? extends FileSystem> fsClass = null;
[3/9] git commit: Enable additional spargel compiler test
Posted by rm...@apache.org.
Enable additional spargel compiler test
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/387bb5d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/387bb5d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/387bb5d3
Branch: refs/heads/release-0.5.1
Commit: 387bb5d361374e1aa89bfc2d408c1d90ee1e961c
Parents: 1dd0acd
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 14:25:46 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:50 2014 +0200
----------------------------------------------------------------------
.../java/eu/stratosphere/spargel/java/SpargelCompilerTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/387bb5d3/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java b/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
index 8b280f2..7a7d251 100644
--- a/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
+++ b/stratosphere-addons/spargel/src/test/java/eu/stratosphere/spargel/java/SpargelCompilerTest.java
@@ -41,7 +41,7 @@ import eu.stratosphere.test.compiler.util.CompilerTestBase;
public class SpargelCompilerTest extends CompilerTestBase {
-// @Test
+ @Test
public void testSpargelCompiler() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
[9/9] git commit: Set version to 0.5.1
Posted by rm...@apache.org.
Set version to 0.5.1
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/67bb703d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/67bb703d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/67bb703d
Branch: refs/heads/release-0.5.1
Commit: 67bb703d794706b6d0a029743a0a6964e7cdd9cd
Parents: 21ecaaa
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 12 21:29:16 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 21:29:16 2014 +0200
----------------------------------------------------------------------
pom.xml | 2 +-
stratosphere-addons/avro/pom.xml | 2 +-
stratosphere-addons/hadoop-compatibility/pom.xml | 2 +-
stratosphere-addons/hbase/pom.xml | 2 +-
stratosphere-addons/jdbc/pom.xml | 2 +-
stratosphere-addons/pom.xml | 2 +-
stratosphere-addons/spargel/pom.xml | 2 +-
stratosphere-addons/yarn/pom.xml | 2 +-
stratosphere-clients/pom.xml | 2 +-
stratosphere-compiler/pom.xml | 2 +-
stratosphere-core/pom.xml | 2 +-
stratosphere-dist/pom.xml | 2 +-
stratosphere-examples/pom.xml | 2 +-
stratosphere-examples/stratosphere-java-examples/pom.xml | 2 +-
stratosphere-examples/stratosphere-scala-examples/pom.xml | 2 +-
stratosphere-java/pom.xml | 2 +-
stratosphere-quickstart/pom.xml | 2 +-
stratosphere-quickstart/quickstart-java/pom.xml | 2 +-
.../src/main/resources/archetype-resources/pom.xml | 4 ++--
stratosphere-quickstart/quickstart-scala/pom.xml | 2 +-
.../src/main/resources/archetype-resources/pom.xml | 4 ++--
stratosphere-runtime/pom.xml | 2 +-
stratosphere-scala/pom.xml | 2 +-
stratosphere-test-utils/pom.xml | 2 +-
stratosphere-tests/pom.xml | 2 +-
tools/change-version | 2 +-
26 files changed, 28 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c784957..fbd3617 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<name>stratosphere</name>
<packaging>pom</packaging>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/pom.xml b/stratosphere-addons/avro/pom.xml
index e9da4e4..1841549 100644
--- a/stratosphere-addons/avro/pom.xml
+++ b/stratosphere-addons/avro/pom.xml
@@ -8,7 +8,7 @@
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/pom.xml b/stratosphere-addons/hadoop-compatibility/pom.xml
index 7beae22..8d5aec9 100644
--- a/stratosphere-addons/hadoop-compatibility/pom.xml
+++ b/stratosphere-addons/hadoop-compatibility/pom.xml
@@ -8,7 +8,7 @@
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hbase/pom.xml b/stratosphere-addons/hbase/pom.xml
index a53cc98..0f90200 100644
--- a/stratosphere-addons/hbase/pom.xml
+++ b/stratosphere-addons/hbase/pom.xml
@@ -7,7 +7,7 @@
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/jdbc/pom.xml b/stratosphere-addons/jdbc/pom.xml
index 7e6dd7c..c411c55 100644
--- a/stratosphere-addons/jdbc/pom.xml
+++ b/stratosphere-addons/jdbc/pom.xml
@@ -8,7 +8,7 @@
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/pom.xml b/stratosphere-addons/pom.xml
index 4effcdc..d6f1214 100644
--- a/stratosphere-addons/pom.xml
+++ b/stratosphere-addons/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/spargel/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/pom.xml b/stratosphere-addons/spargel/pom.xml
index 1ebe79e..bf35a14 100644
--- a/stratosphere-addons/spargel/pom.xml
+++ b/stratosphere-addons/spargel/pom.xml
@@ -7,7 +7,7 @@
<parent>
<artifactId>stratosphere-addons</artifactId>
<groupId>eu.stratosphere</groupId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-addons/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/pom.xml b/stratosphere-addons/yarn/pom.xml
index 1f31a03..2da1394 100644
--- a/stratosphere-addons/yarn/pom.xml
+++ b/stratosphere-addons/yarn/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-addons</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-clients/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/pom.xml b/stratosphere-clients/pom.xml
index d812d3c..1562f19 100644
--- a/stratosphere-clients/pom.xml
+++ b/stratosphere-clients/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/pom.xml b/stratosphere-compiler/pom.xml
index 2de4a6a..4a727a8 100644
--- a/stratosphere-compiler/pom.xml
+++ b/stratosphere-compiler/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-core/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-core/pom.xml b/stratosphere-core/pom.xml
index 27f359d..7512969 100644
--- a/stratosphere-core/pom.xml
+++ b/stratosphere-core/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-dist/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-dist/pom.xml b/stratosphere-dist/pom.xml
index e9e3ffd..da0bf01 100644
--- a/stratosphere-dist/pom.xml
+++ b/stratosphere-dist/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/pom.xml b/stratosphere-examples/pom.xml
index aff3aca..e1b0368 100644
--- a/stratosphere-examples/pom.xml
+++ b/stratosphere-examples/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/stratosphere-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/pom.xml b/stratosphere-examples/stratosphere-java-examples/pom.xml
index 43500e8..24302c0 100644
--- a/stratosphere-examples/stratosphere-java-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-java-examples/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-examples</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-examples/stratosphere-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-scala-examples/pom.xml b/stratosphere-examples/stratosphere-scala-examples/pom.xml
index ea8a3c9..da94c3a 100644
--- a/stratosphere-examples/stratosphere-scala-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-scala-examples/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-examples</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-java/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-java/pom.xml b/stratosphere-java/pom.xml
index 7f1fe29..4df0ea4 100644
--- a/stratosphere-java/pom.xml
+++ b/stratosphere-java/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/pom.xml b/stratosphere-quickstart/pom.xml
index fa4a668..fa08572 100644
--- a/stratosphere-quickstart/pom.xml
+++ b/stratosphere-quickstart/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-java/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-java/pom.xml b/stratosphere-quickstart/quickstart-java/pom.xml
index f813eb1..6da8c93 100644
--- a/stratosphere-quickstart/quickstart-java/pom.xml
+++ b/stratosphere-quickstart/quickstart-java/pom.xml
@@ -9,7 +9,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-quickstart</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml b/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
index 82d810c..0a51cc8 100644
--- a/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/stratosphere-quickstart/quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -20,12 +20,12 @@
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-scala/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-scala/pom.xml b/stratosphere-quickstart/quickstart-scala/pom.xml
index 91cf2d4..afffef1 100644
--- a/stratosphere-quickstart/quickstart-scala/pom.xml
+++ b/stratosphere-quickstart/quickstart-scala/pom.xml
@@ -9,7 +9,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-quickstart</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 4910746..70a17af 100644
--- a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -22,12 +22,12 @@
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-scala</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml
index 13f6cfb..8d9d175 100644
--- a/stratosphere-runtime/pom.xml
+++ b/stratosphere-runtime/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-scala/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-scala/pom.xml b/stratosphere-scala/pom.xml
index c9c6eb3..f511825 100644
--- a/stratosphere-scala/pom.xml
+++ b/stratosphere-scala/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/pom.xml b/stratosphere-test-utils/pom.xml
index 29fa208..ec3695e 100644
--- a/stratosphere-test-utils/pom.xml
+++ b/stratosphere-test-utils/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/stratosphere-tests/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/pom.xml b/stratosphere-tests/pom.xml
index 7dec412..04eaeba 100644
--- a/stratosphere-tests/pom.xml
+++ b/stratosphere-tests/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <version>0.5.1</version>
<relativePath>..</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/67bb703d/tools/change-version
----------------------------------------------------------------------
diff --git a/tools/change-version b/tools/change-version
index ef2497e..b82d2cd 100755
--- a/tools/change-version
+++ b/tools/change-version
@@ -12,4 +12,4 @@
# specific language governing permissions and limitations under the License.
########################################################################################################################
-find .. -name 'pom.xml' -type f -exec sed -i 's#<version>0.5</version>#<version>0.5.1-SNAPSHOT</version>#' {} \;
+find .. -name 'pom.xml' -type f -exec sed -i 's#<version>0.5.1-SNAPSHOT</version>#<version>0.5.1</version>#' {} \;