You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:42 UTC

[18/63] [abbrv] git commit: Stubs for intermediate data set and related classes

Stubs for intermediate data set and related classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c16f6d81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c16f6d81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c16f6d81

Branch: refs/heads/master
Commit: c16f6d816899d8db7ff7c809d0c26be611b4d561
Parents: 9035b6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 30 17:12:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../runtime/jobgraph/AbstractJobVertex.java     | 13 +++--
 .../apache/flink/runtime/jobgraph/JobGraph.java | 22 ---------
 .../nephele/jobgraph/IntermediateDataSet.java   | 42 ++++++++++++++++
 .../nephele/jobgraph/IntermediateDataSetID.java | 50 ++++++++++++++++++++
 4 files changed, 100 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 7df76c3..cc7a5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -42,19 +42,19 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	private static final String DEFAULT_NAME = "(unnamed vertex)";
 	
 	/**
-	 * List of outgoing edges.
+	 * List of produced data sets, one per writer
 	 */
-	private final ArrayList<JobEdge> forwardEdges = new ArrayList<JobEdge>();
+	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
 
 	/**
-	 * List of incoming edges.
+	 * List of edges with incoming data. One per Reader.
 	 */
-	private final ArrayList<JobEdge> backwardEdges = new ArrayList<JobEdge>();
+	private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
 
 	/**
 	 * The name of the vertex or task, respectively.
 	 */
-	private String name;
+	private final String name;
 
 	/**
 	 * The ID of the vertex.
@@ -86,6 +86,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 */
 	protected Class<? extends AbstractInvokable> invokableClass;
 
+	// --------------------------------------------------------------------------------------------
 	
 	/**
 	 * Constructs a new job vertex and assigns it with the given name.
@@ -112,6 +113,8 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 		this.id = id == null ? new JobVertexID() : id;
 		this.jobGraph = jobGraph;
 	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Connects the job vertex to the specified job vertex.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 48d858a..39dc382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -192,30 +192,8 @@ public class JobGraph implements IOReadableWritable {
 	 */
 	public AbstractJobVertex[] getAllJobVertices() {
 		return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]);
-		
-		int i = 0;
-		final AbstractJobVertex[] vertices = new AbstractJobVertex[inputVertices.size() + outputVertices.size()
-			+ taskVertices.size()];
-
-		final Iterator<AbstractJobInputVertex> iv = getInputVertices();
-		while (iv.hasNext()) {
-			vertices[i++] = iv.next();
-		}
-
-		final Iterator<AbstractJobOutputVertex> ov = getOutputVertices();
-		while (ov.hasNext()) {
-			vertices[i++] = ov.next();
-		}
-
-		final Iterator<JobTaskVertex> tv = getTaskVertices();
-		while (tv.hasNext()) {
-			vertices[i++] = tv.next();
-		}
-
-		return vertices;
 	}
 
-
 	/**
 	 * Returns the ID of the job.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
new file mode 100644
index 0000000..fdc3375
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
@@ -0,0 +1,42 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+/**
+ * An intermediate data set is the data set produced by an operator - either a
+ * source or any intermediate operation.
+ * 
+ * Intermediate data sets may be read by other operators, materialized, or
+ * discarded.
+ */
+public class IntermediateDataSet {
+	
+	private final IntermediateDataSetID id; 		// the identifier
+	
+	private final AbstractJobVertex producer;		// the operation that produced this data set
+
+	
+	public IntermediateDataSet(AbstractJobVertex producer) {
+		this(new IntermediateDataSetID(), producer);
+	}
+	
+	public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
+		this.id = id;
+		this.producer = producer;
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
new file mode 100644
index 0000000..ac12be9
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
@@ -0,0 +1,50 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+import java.util.UUID;
+
+import eu.stratosphere.nephele.AbstractID;
+
+public class IntermediateDataSetID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates an new random intermediate data set ID.
+	 */
+	public IntermediateDataSetID() {
+		super();
+	}
+	
+	/**
+	 * Creates a new intermediate data set ID with the bytes of the given ID.
+	 * 
+	 * @param from The ID to create this ID from.
+	 */
+	public IntermediateDataSetID(AbstractID from) {
+		super(from);
+	}
+	
+	/**
+	 * Creates a new intermediate data set ID with the bytes of the given UUID.
+	 * 
+	 * @param from The UUID to create this ID from.
+	 */
+	public IntermediateDataSetID(UUID from) {
+		super(from.getLeastSignificantBits(), from.getMostSignificantBits());
+	}
+}