You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:57 UTC

[6/9] git commit: Allow multiple successive programs on the same execution environment.

Allow multiple successive programs on the same execution environment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2a165eeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2a165eeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2a165eeb

Branch: refs/heads/release-0.5.1
Commit: 2a165eeb5094915b7df81537f963136696e8cc8b
Parents: 51b793f
Author: StephanEwen <st...@tu-berlin.de>
Authored: Thu Jun 12 16:06:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:48:05 2014 +0200

----------------------------------------------------------------------
 .../api/java/ExecutionEnvironment.java          |  7 +-
 .../api/java/io/DiscardingOutputFormat.java     | 41 ++++++++++++
 .../api/java/MultipleInvokationsTest.java       | 68 ++++++++++++++++++++
 3 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index f04cddd..2f7aef3 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -88,7 +88,7 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
 	
-	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList();
+	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
 	private int degreeOfParallelism = -1;
 	
@@ -574,7 +574,7 @@ public abstract class ExecutionEnvironment {
 	 * @param executable flag indicating whether the file should be executable
 	 */
 	public void registerCachedFile(String filePath, String name, boolean executable){
-		this.cacheFile.add(new Tuple2(name, new DistributedCacheEntry(filePath, executable)));
+		this.cacheFile.add(new Tuple2<String, DistributedCacheEntry>(name, new DistributedCacheEntry(filePath, executable)));
 	}
 	
 	/**
@@ -635,6 +635,9 @@ public abstract class ExecutionEnvironment {
 			throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
 		}
 		
+		// clear all the sinks such that the next execution does not redo everything
+		this.sinks.clear();
+		
 		return plan;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
new file mode 100644
index 0000000..44912c9
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/DiscardingOutputFormat.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.io;
+
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * An output format that simply discards all elements.
+ *
+ * @param <T> The type of the elements accepted by the output format.
+ */
+public class DiscardingOutputFormat<T> implements OutputFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void configure(Configuration parameters) {}
+
+	@Override
+	public void open(int taskNumber, int numTasks) {}
+
+	@Override
+	public void writeRecord(T record) {}
+
+	@Override
+	public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2a165eeb/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
new file mode 100644
index 0000000..8159ec0
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/MultipleInvokationsTest.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+
+public class MultipleInvokationsTest {
+
+	@Test
+	public void testMultipleInvocationsGetPlan() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// ----------- Execution 1 ---------------
+			
+			DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
+			data.print().name("print1");
+			data.output(new DiscardingOuputFormat<String>()).name("output1");
+			
+			{
+				Plan p = env.createProgramPlan();
+				
+				assertEquals(2, p.getDataSinks().size());
+				for (GenericDataSinkBase<?> sink : p.getDataSinks()) {
+					assertTrue(sink.getName().equals("print1") || sink.getName().equals("output1"));
+					assertEquals("source1", sink.getInput().getName());
+				}
+			}
+			
+			// ----------- Execution 2 ---------------
+			
+			data.writeAsText("/some/file/path").name("textsink");
+			
+			{
+				Plan p = env.createProgramPlan();
+			
+				assertEquals(1, p.getDataSinks().size());
+				GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+				assertEquals("textsink", sink.getName());
+				assertEquals("source1", sink.getInput().getName());
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}