You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:42 UTC
[18/63] [abbrv] git commit: Stubs for intermediate data set and
related classes
Stubs for intermediate data set and related classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c16f6d81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c16f6d81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c16f6d81
Branch: refs/heads/master
Commit: c16f6d816899d8db7ff7c809d0c26be611b4d561
Parents: 9035b6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 30 17:12:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../runtime/jobgraph/AbstractJobVertex.java | 13 +++--
.../apache/flink/runtime/jobgraph/JobGraph.java | 22 ---------
.../nephele/jobgraph/IntermediateDataSet.java | 42 ++++++++++++++++
.../nephele/jobgraph/IntermediateDataSetID.java | 50 ++++++++++++++++++++
4 files changed, 100 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 7df76c3..cc7a5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -42,19 +42,19 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
private static final String DEFAULT_NAME = "(unnamed vertex)";
/**
- * List of outgoing edges.
+ * List of produced data sets, one per writer
*/
- private final ArrayList<JobEdge> forwardEdges = new ArrayList<JobEdge>();
+ private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
/**
- * List of incoming edges.
+ * List of edges with incoming data. One per Reader.
*/
- private final ArrayList<JobEdge> backwardEdges = new ArrayList<JobEdge>();
+ private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
/**
* The name of the vertex or task, respectively.
*/
- private String name;
+ private final String name;
/**
* The ID of the vertex.
@@ -86,6 +86,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
*/
protected Class<? extends AbstractInvokable> invokableClass;
+ // --------------------------------------------------------------------------------------------
/**
* Constructs a new job vertex and assigns it with the given name.
@@ -112,6 +113,8 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
this.id = id == null ? new JobVertexID() : id;
this.jobGraph = jobGraph;
}
+
+ // --------------------------------------------------------------------------------------------
/**
* Connects the job vertex to the specified job vertex.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 48d858a..39dc382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -192,30 +192,8 @@ public class JobGraph implements IOReadableWritable {
*/
public AbstractJobVertex[] getAllJobVertices() {
return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]);
-
- int i = 0;
- final AbstractJobVertex[] vertices = new AbstractJobVertex[inputVertices.size() + outputVertices.size()
- + taskVertices.size()];
-
- final Iterator<AbstractJobInputVertex> iv = getInputVertices();
- while (iv.hasNext()) {
- vertices[i++] = iv.next();
- }
-
- final Iterator<AbstractJobOutputVertex> ov = getOutputVertices();
- while (ov.hasNext()) {
- vertices[i++] = ov.next();
- }
-
- final Iterator<JobTaskVertex> tv = getTaskVertices();
- while (tv.hasNext()) {
- vertices[i++] = tv.next();
- }
-
- return vertices;
}
-
/**
* Returns the ID of the job.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
new file mode 100644
index 0000000..fdc3375
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSet.java
@@ -0,0 +1,42 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+/**
+ * An intermediate data set is the data set produced by an operator - either a
+ * source or any intermediate operation.
+ *
+ * Intermediate data sets may be read by other operators, materialized, or
+ * discarded.
+ */
+public class IntermediateDataSet {
+
+ private final IntermediateDataSetID id; // the identifier
+
+ private final AbstractJobVertex producer; // the operation that produced this data set
+
+
+ public IntermediateDataSet(AbstractJobVertex producer) {
+ this(new IntermediateDataSetID(), producer);
+ }
+
+ public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
+ this.id = id;
+ this.producer = producer;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c16f6d81/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
new file mode 100644
index 0000000..ac12be9
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/IntermediateDataSetID.java
@@ -0,0 +1,50 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+import java.util.UUID;
+
+import eu.stratosphere.nephele.AbstractID;
+
+public class IntermediateDataSetID extends AbstractID {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates an new random intermediate data set ID.
+ */
+ public IntermediateDataSetID() {
+ super();
+ }
+
+ /**
+ * Creates a new intermediate data set ID with the bytes of the given ID.
+ *
+ * @param from The ID to create this ID from.
+ */
+ public IntermediateDataSetID(AbstractID from) {
+ super(from);
+ }
+
+ /**
+ * Creates a new intermediate data set ID with the bytes of the given UUID.
+ *
+ * @param from The UUID to create this ID from.
+ */
+ public IntermediateDataSetID(UUID from) {
+ super(from.getLeastSignificantBits(), from.getMostSignificantBits());
+ }
+}