You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:57 UTC
[6/9] git commit: Allow multiple successive programs on the same
execution environment.
Allow multiple successive programs on the same execution environment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2a165eeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2a165eeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2a165eeb
Branch: refs/heads/release-0.5.1
Commit: 2a165eeb5094915b7df81537f963136696e8cc8b
Parents: 51b793f
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 16:06:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:05 2014 +0200
----------------------------------------------------------------------
.../api/java/ExecutionEnvironment.java | 7 +-
.../api/java/io/DiscardingOutputFormat.java | 41 ++++++++++++
.../api/java/MultipleInvokationsTest.java | 68 ++++++++++++++++++++
3 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index f04cddd..2f7aef3 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -88,7 +88,7 @@ public abstract class ExecutionEnvironment {
private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
- private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
+ private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
private int degreeOfParallelism = -1;
@@ -574,7 +574,7 @@ public abstract class ExecutionEnvironment {
* @param executable flag indicating whether the file should be executable
*/
public void registerCachedFile(String filePath, String name, boolean executable){
- this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
+ this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
}
/**
@@ -635,6 +635,9 @@ public abstract class ExecutionEnvironment {
throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
}
+ // clear all the sinks such that the next execution does not redo everything
+ this.sinks.clear();
+
return plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
new file mode 100644
index 0000000..44912c9
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.io;
+
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * An output format that simply discards all elements.
+ *
+ * @param <T> The type of the elements accepted by the output format.
+ */
+public class DiscardingOutputFormat<T> implements OutputFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) {}
+
+ @Override
+ public void writeRecord(T record) {}
+
+ @Override
+ public void close() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
new file mode 100644
index 0000000..8159ec0
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+
+public class MultipleInvokationsTest {
+
+ @Test
+ public void testMultipleInvocationsGetPlan() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // ----------- Execution 1 ---------------
+
+ DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
+ data.print().name("print1");
+ data.output(new DiscardingOuputFormat<String>()).name("output1");
+
+ {
+ Plan p = env.createProgramPlan();
+
+ assertEquals(2, p.getDataSinks().size());
+ for (GenericDataSinkBase<?> sink : p.getDataSinks()) {
+ assertTrue(sink.getName().equals("print1") || sink.getName().equals("output1"));
+ assertEquals("source1", sink.getInput().getName());
+ }
+ }
+
+ // ----------- Execution 2 ---------------
+
+ data.writeAsText("/some/file/path").name("textsink");
+
+ {
+ Plan p = env.createProgramPlan();
+
+ assertEquals(1, p.getDataSinks().size());
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ assertEquals("textsink", sink.getName());
+ assertEquals("source1", sink.getInput().getName());
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}