You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:06 UTC
[66/92] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..10ca85d
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest {
+ private JDBCInputFormat jdbcInputFormat;
+ private JDBCOutputFormat jdbcOutputFormat;
+
+ private static Connection conn;
+
+ static final Value[][] dbData = {
+ {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
+ {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
+ {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
+ {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
+ {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
+
+ @BeforeClass
+ public static void setUpClass() {
+ try {
+ System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+ prepareDerbyInputDatabase();
+ prepareDerbyOutputDatabase();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private static void cleanUpDerbyDatabases() {
+ try {
+ String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ conn = DriverManager.getConnection(dbURL);
+ Statement stat = conn.createStatement();
+ stat.executeUpdate("DROP TABLE books");
+ stat.executeUpdate("DROP TABLE newbooks");
+ stat.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
+ try {
+ String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ conn = DriverManager.getConnection(dbURL);
+ createTableBooks();
+ insertDataToSQLTables();
+ conn.close();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ Assert.fail();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
+ try {
+ String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ conn = DriverManager.getConnection(dbURL);
+ createTableNewBooks();
+ conn.close();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ Assert.fail();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ private static void createTableBooks() throws SQLException {
+ StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+ sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+ sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+ sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+ sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+ Statement stat = conn.createStatement();
+ stat.executeUpdate(sqlQueryBuilder.toString());
+ stat.close();
+ }
+
+ private static void createTableNewBooks() throws SQLException {
+ StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+ sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+ sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+ sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+ sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+ sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+ Statement stat = conn.createStatement();
+ stat.executeUpdate(sqlQueryBuilder.toString());
+ stat.close();
+ }
+
+ private static void insertDataToSQLTables() throws SQLException {
+ StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+ sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+ sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+ sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+ sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+ sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+ Statement stat = conn.createStatement();
+ stat.execute(sqlQueryBuilder.toString());
+ stat.close();
+ }
+
+
+ @After
+ public void tearDown() {
+ jdbcOutputFormat = null;
+ cleanUpDerbyDatabases();
+ }
+
+ @Test
+ public void testJDBCOutputFormat() throws IOException {
+ String sourceTable = "books";
+ String targetTable = "newbooks";
+ String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
+ String dbUrl = "jdbc:derby:memory:ebookshop";
+
+ Configuration cfg = new Configuration();
+ cfg.setString("driver", driverPath);
+ cfg.setString("url", dbUrl);
+ cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
+ cfg.setInteger("fields", 5);
+ cfg.setClass("type0", IntValue.class);
+ cfg.setClass("type1", StringValue.class);
+ cfg.setClass("type2", StringValue.class);
+ cfg.setClass("type3", FloatValue.class);
+ cfg.setClass("type4", IntValue.class);
+
+ jdbcOutputFormat = new JDBCOutputFormat();
+ jdbcOutputFormat.configure(cfg);
+ jdbcOutputFormat.open(0,1);
+
+ jdbcInputFormat = new JDBCInputFormat(
+ driverPath,
+ dbUrl,
+ "select * from " + sourceTable);
+ jdbcInputFormat.configure(null);
+
+ Record record = new Record();
+ while (!jdbcInputFormat.reachedEnd()) {
+ jdbcInputFormat.nextRecord(record);
+ jdbcOutputFormat.writeRecord(record);
+ }
+
+ jdbcOutputFormat.close();
+ jdbcInputFormat.close();
+
+ jdbcInputFormat = new JDBCInputFormat(
+ driverPath,
+ dbUrl,
+ "select * from " + targetTable);
+ jdbcInputFormat.configure(null);
+
+ int recordCount = 0;
+ while (!jdbcInputFormat.reachedEnd()) {
+ jdbcInputFormat.nextRecord(record);
+ Assert.assertEquals(5, record.getNumFields());
+ Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
+ Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
+ Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
+ Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
+ Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
+
+ int[] pos = {0, 1, 2, 3, 4};
+ Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
+ Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
+
+ recordCount++;
+ }
+ Assert.assertEquals(5, recordCount);
+
+ jdbcInputFormat.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/pom.xml b/flink-addons/flink-spargel/pom.xml
new file mode 100644
index 0000000..5136ad0
--- /dev/null
+++ b/flink-addons/flink-spargel/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-addons</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-spargel</artifactId>
+ <name>flink-spargel</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
new file mode 100644
index 0000000..3e1930c
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
+ * the <i>foreach</i> syntax.
+ */
+public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private transient Iterator<Tuple2<?, Message>> source;
+
+
+ final void setSource(Iterator<Tuple2<?, Message>> source) {
+ this.source = source;
+ }
+
+ @Override
+ public final boolean hasNext() {
+ return this.source.hasNext();
+ }
+
+ @Override
+ public final Message next() {
+ return this.source.next().f1;
+ }
+
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Message> iterator() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
new file mode 100644
index 0000000..1b5cbde
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ */
+public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per superstep for each vertex that was changed in that superstep.
+ * It needs to produce the messages that will be received by vertices in the next superstep.
+ *
+ * @param vertexKey The key of the vertex that was changed.
+ * @param vertexValue The value (state) of the vertex that was changed.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+
+ /**
+ * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+
+ /**
+ * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
+ * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+ *
+ * @return An iterator with all outgoing edges.
+ */
+ @SuppressWarnings("unchecked")
+ public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ }
+ edgesUsed = true;
+
+ if (this.edgeWithValueIter != null) {
+ this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
+ return this.edgeWithValueIter;
+ } else {
+ this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
+ return this.edgeNoValueIter;
+ }
+ }
+
+ /**
+ * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+ * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
+ *
+ * @param m The message to send.
+ */
+ public void sendMessageToAllNeighbors(Message m) {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ }
+
+ edgesUsed = true;
+
+ outValue.f1 = m;
+
+ while (edges.hasNext()) {
+ Tuple next = (Tuple) edges.next();
+ VertexKey k = next.getField(1);
+ outValue.f0 = k;
+ out.collect(outValue);
+ }
+ }
+
+ /**
+ * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+ * the next superstep will cause an exception due to a non-deliverable message.
+ *
+ * @param target The key (id) of the target vertex to message.
+ * @param m The message.
+ */
+ public void sendMessageTo(VertexKey target, Message m) {
+ outValue.f0 = target;
+ outValue.f1 = m;
+ out.collect(outValue);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods and state
+ // --------------------------------------------------------------------------------------------
+
+ private Tuple2<VertexKey, Message> outValue;
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Iterator<?> edges;
+
+ private Collector<Tuple2<VertexKey, Message>> out;
+
+ private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
+
+ private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
+
+ private boolean edgesUsed;
+
+
+ void init(IterationRuntimeContext context, boolean hasEdgeValue) {
+ this.runtimeContext = context;
+ this.outValue = new Tuple2<VertexKey, Message>();
+
+ if (hasEdgeValue) {
+ this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
+ } else {
+ this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
+ }
+ }
+
+ void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
+ this.edges = edges;
+ this.out = out;
+ this.edgesUsed = false;
+ }
+
+
+
+ private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
+ implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
+ {
+ private Iterator<Tuple2<VertexKey, VertexKey>> input;
+
+ private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
+
+
+ void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
+ this.input = input;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public OutgoingEdge<VertexKey, EdgeValue> next() {
+ Tuple2<VertexKey, VertexKey> next = input.next();
+ edge.set(next.f1, null);
+ return edge;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+ return this;
+ }
+ }
+
+
+ private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
+ implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
+ {
+ private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
+
+ private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
+
+ void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
+ this.input = input;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public OutgoingEdge<VertexKey, EdgeValue> next() {
+ Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
+ edge.set(next.f1, next.f2);
+ return edge;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
new file mode 100644
index 0000000..aef9d0b
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+/**
+ * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
+ * vertex id. Edges may have an associated value (for example a weight or a distance), if the
+ * graph algorithm was initialized with the
+ * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
+ * method.
+ *
+ * @param <VertexKey> The type of the vertex key.
+ * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
+ * value, this type may be arbitrary.
+ */
+public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private VertexKey target;
+
+ private EdgeValue edgeValue;
+
+ void set(VertexKey target, EdgeValue edgeValue) {
+ this.target = target;
+ this.edgeValue = edgeValue;
+ }
+
+ /**
+ * Gets the target vertex id.
+ *
+ * @return The target vertex id.
+ */
+ public VertexKey target() {
+ return target;
+ }
+
+ /**
+ * Gets the value associated with the edge. The value may be null if the iteration was initialized with
+ * an edge data set without edge values.
+ * Typical examples of edge values are weights or distances of the path represented by the edge.
+ *
+ * @return The value associated with the edge.
+ */
+ public EdgeValue edgeValue() {
+ return edgeValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
new file mode 100644
index 0000000..bb84cea
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DeltaIteration;
+import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class represents iterative graph computations, programmed in a vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ * the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ * considered updated.</li>
+ * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ * edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ * Vertex-centric graph iterations are instantiated by the
+ * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
+ * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
+ * the graph's edges are carrying values.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
+ implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+{
+ private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
+
+ private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+
+ private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
+
+ private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
+
+ private final Map<String, Aggregator<?>> aggregators;
+
+ private final int maximumNumberOfIterations;
+
+ private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+
+ private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+
+ private final TypeInformation<Message> messageType;
+
+ private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
+
+ private String name;
+
+ private int parallelism = -1;
+
+ // ----------------------------------------------------------------------------------
+
+ private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+ MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+ DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+ int maximumNumberOfIterations)
+ {
+ Validate.notNull(uf);
+ Validate.notNull(mf);
+ Validate.notNull(edgesWithoutValue);
+ Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ // check that the edges are actually a valid tuple set of vertex key types
+ TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
+ Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
+
+ TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+ Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+ && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+ "Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+
+ this.updateFunction = uf;
+ this.messagingFunction = mf;
+ this.edgesWithoutValue = edgesWithoutValue;
+ this.edgesWithValue = null;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ this.aggregators = new HashMap<String, Aggregator<?>>();
+
+ this.messageType = getMessageType(mf);
+ }
+
+ private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+ MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+ DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
+ int maximumNumberOfIterations,
+ boolean edgeHasValueMarker)
+ {
+ Validate.notNull(uf);
+ Validate.notNull(mf);
+ Validate.notNull(edgesWithValue);
+ Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ // check that the edges are actually a valid tuple set of vertex key types
+ TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
+ Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
+
+ TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+ Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+ && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+ "The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+
+ Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ this.updateFunction = uf;
+ this.messagingFunction = mf;
+ this.edgesWithoutValue = null;
+ this.edgesWithValue = edgesWithValue;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ this.aggregators = new HashMap<String, Aggregator<?>>();
+
+ this.messageType = getMessageType(mf);
+ }
+
+ private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
+ return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
+ }
+
+ /**
+ * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+ * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+ * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+ *
+ * @param name The name of the aggregator, used to retrieve it and its aggregates during execution.
+ * @param aggregator The aggregator.
+ */
+ public void registerAggregator(String name, Aggregator<?> aggregator) {
+ this.aggregators.put(name, aggregator);
+ }
+
+ /**
+ * Adds a data set as a broadcast set to the messaging function.
+ *
+ * @param name The name under which the broadcast data is available in the messaging function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+ this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Adds a data set as a broadcast set to the vertex update function.
+ *
+ * @param name The name under which the broadcast data is available in the vertex update function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+ this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+ *
+ * @param name The name for the iteration.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets the name from this vertex-centric iteration.
+ *
+ * @return The name of the iteration.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the degree of parallelism for the iteration.
+ *
+ * @param parallelism The degree of parallelism.
+ */
+ public void setParallelism(int parallelism) {
+ Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+ this.parallelism = parallelism;
+ }
+
+ /**
+ * Gets the iteration's degree of parallelism.
+ *
+ * @return The iterations parallelism, or -1, if not set.
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Operator behavior
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the input data set for this operator. In the case of this operator this input data set represents
+ * the set of vertices with their initial state.
+ *
+ * @param inputData The input data set, which in the case of this operator represents the set of
+ * vertices with their initial state.
+ *
+ * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+ */
+ @Override
+ public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
+ // sanity check that we really have two tuples
+ TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
+ Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
+
+ // check that the key type here is the same as for the edges
+ TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
+ TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
+ TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
+
+ Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
+ "must be the same data type as the first fields of the edge data set (the source vertex id). " +
+ "Here, the key type for the vertex ids is '%s' and the key type for the edges is '%s'.", keyType, edgeKeyType);
+
+ this.initialVertices = inputData;
+ }
+
+ /**
+ * Creates the operator that represents this vertex-centric graph computation.
+ *
+ * @return The operator that represents this vertex-centric graph computation.
+ */
+ @Override
+ public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
+ if (this.initialVertices == null) {
+ throw new IllegalStateException("The input data set has not been set.");
+ }
+
+ // prepare some type information
+ TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+ TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+ TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
+
+ // set up the iteration operator
+ final String name = (this.name != null) ? this.name :
+ "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
+ final int[] zeroKeyPos = new int[] {0};
+
+ final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
+ this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
+ iteration.name(name);
+ iteration.parallelism(parallelism);
+
+ // register all aggregators
+ for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
+ iteration.registerAggregator(entry.getKey(), entry.getValue());
+ }
+
+ // build the messaging function (co group)
+ CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+ if (edgesWithoutValue != null) {
+ MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
+ messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+ }
+ else {
+ MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
+ messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+ }
+
+ // configure coGroup message function with name and broadcast variables
+ messages = messages.name("Messaging");
+ for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
+
+ VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+
+ // build the update function (co group)
+ CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
+ messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+ // configure coGroup update function with name and broadcast variables
+ updates = updates.name("Vertex State Updates");
+ for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
+ updates = updates.withBroadcastSet(e.f1, e.f0);
+ }
+
+ // let the operator know that we preserve the key field
+ updates.withConstantSetFirst("0").withConstantSetSecond("0");
+
+ return iteration.closeWith(updates, updates);
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Constructor builders to avoid signature conflicts with generic type erasure
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
+ *
+ * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+ * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
+ * @param messagingFunction The function that turns changed vertex states into messages along the edges.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ *
+ * @return An in stance of the vertex-centric graph computation operator.
+ */
+ public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
+ VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
+ DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+ VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+ MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+ int maximumNumberOfIterations)
+ {
+ @SuppressWarnings("unchecked")
+ MessagingFunction<VertexKey, VertexValue, Message, Object> tmf =
+ (MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
+
+ return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
+ }
+
+ /**
+ * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
+ * a weight or distance).
+ *
+ * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+ * @param uf The function that updates the state of the vertices from the incoming messages.
+ * @param mf The function that turns changed vertex states into messages along the edges.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ *
+ * @return An in stance of the vertex-centric graph computation operator.
+ */
+ public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
+ VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
+ DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
+ VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+ MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+ int maximumNumberOfIterations)
+ {
+ return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Wrapping UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
+ extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+ implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+
+ private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+
+ private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
+
+
+ private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+ TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
+ {
+ this.vertexUpdateFunction = vertexUpdateFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
+ Collector<Tuple2<VertexKey, VertexValue>> out)
+ throws Exception
+ {
+ if (vertex.hasNext()) {
+ Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
+
+ @SuppressWarnings("unchecked")
+ Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
+ messageIter.setSource(downcastIter);
+
+ vertexUpdateFunction.setOutput(vertexState, out);
+ vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
+ } else {
+ if (messages.hasNext()) {
+ String message = "Target vertex does not exist!.";
+ try {
+ Tuple2<VertexKey, Message> next = messages.next();
+ message = "Target vertex '" + next.f0 + "' does not exist!.";
+ } catch (Throwable t) {}
+ throw new Exception(message);
+ } else {
+ throw new Exception();
+ }
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.vertexUpdateFunction.init(getIterationRuntimeContext());
+ }
+ this.vertexUpdateFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.vertexUpdateFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ /*
+ * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
+ */
+ private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
+ extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+ implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
+
+ private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+
+
+ private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+ TypeInformation<Tuple2<VertexKey, Message>> resultType)
+ {
+ this.messagingFunction = messagingFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
+ Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+ throws Exception
+ {
+ if (state.hasNext()) {
+ Tuple2<VertexKey, VertexValue> newVertexState = state.next();
+ messagingFunction.set((Iterator<?>) edges, out);
+ messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.messagingFunction.init(getIterationRuntimeContext(), false);
+ }
+
+ this.messagingFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.messagingFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ /*
+ * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+ */
+ private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
+ extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+ implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+
+ private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+
+
+ private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+ TypeInformation<Tuple2<VertexKey, Message>> resultType)
+ {
+ this.messagingFunction = messagingFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
+ Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+ throws Exception
+ {
+ if (state.hasNext()) {
+ Tuple2<VertexKey, VertexValue> newVertexState = state.next();
+ messagingFunction.set((Iterator<?>) edges, out);
+ messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.messagingFunction.init(getIterationRuntimeContext(), true);
+ }
+
+ this.messagingFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.messagingFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+ return this.resultType;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
new file mode 100644
index 0000000..c072754
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ *
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+ * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+ * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
+ *
+ * @param vertexKey The key (identifier) of the vertex.
+ * @param vertexValue The value (state) of the vertex.
+ * @param inMessages The incoming messages to this vertex.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+
+ /**
+ * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+ /**
+ * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+ *
+ * @param newValue The new vertex value.
+ */
+ public void setNewVertexValue(VertexValue newValue) {
+ outVal.f1 = newValue;
+ out.collect(outVal);
+ }
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Collector<Tuple2<VertexKey, VertexValue>> out;
+
+ private Tuple2<VertexKey, VertexValue> outVal;
+
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ }
+
+ void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
+ this.out = out;
+ this.outVal = val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
new file mode 100644
index 0000000..ea90feb
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class SpargelConnectedComponents {
+
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> vertexIds = env.generateSequence(0, 10);
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
+ new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
+
+ DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+ DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+
+ result.print();
+ env.execute("Spargel Connected Components");
+ }
+
+ public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+ @Override
+ public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ long min = Long.MAX_VALUE;
+ for (long msg : inMessages) {
+ min = Math.min(min, msg);
+ }
+ if (min < vertexValue) {
+ setNewVertexValue(min);
+ }
+ }
+ }
+
+ public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
+ @Override
+ public void sendMessages(Long vertexId, Long componentId) {
+ sendMessageToAllNeighbors(componentId);
+ }
+ }
+
+ /**
+ * A map function that takes a Long value and creates a 2-tuple out of it:
+ * <pre>(Long value) -> (value, value)</pre>
+ */
+ public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> {
+ @Override
+ public Tuple2<Long, Long> map(Long value) {
+ return new Tuple2<Long, Long>(value, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
new file mode 100644
index 0000000..c7fbaaa
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRank {
+
+ private static final double BETA = 0.85;
+
+
+ public static void main(String[] args) throws Exception {
+ final int numVertices = 100;
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // enumerate some sample edges and assign an initial uniform probability (rank)
+ DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
+ .map(new MapFunction<Long, Tuple2<Long, Double>>() {
+ public Tuple2<Long, Double> map(Long value) {
+ return new Tuple2<Long, Double>(value, 1.0/numVertices);
+ }
+ });
+
+ // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+ DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
+ .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+ int numOutEdges = (int) (Math.random() * (numVertices / 2));
+ for (int i = 0; i < numOutEdges; i++) {
+ long target = (long) (Math.random() * numVertices) + 1;
+ out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+ }
+ }
+ });
+
+ DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
+ VertexCentricIteration.withValuedEdges(edgesWithProbability,
+ new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
+
+ result.print();
+ env.execute("Spargel PageRank");
+ }
+
+ /**
+ * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+ * and then applying the dampening formula.
+ */
+ public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+ private final long numVertices;
+ private final double beta;
+
+ public VertexRankUpdater(long numVertices, double beta) {
+ this.numVertices = numVertices;
+ this.beta = beta;
+ }
+
+ @Override
+ public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+ double rankSum = 0.0;
+ for (double msg : inMessages) {
+ rankSum += msg;
+ }
+
+ // apply the dampening factor / random jump
+ double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+ setNewVertexValue(newRank);
+ }
+ }
+
+ /**
+ * Distributes the rank of a vertex among all target vertices according to the transition probability,
+ * which is associated with an edge as the edge value.
+ */
+ public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Long vertexId, Double newRank) {
+ for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+ sendMessageTo(edge.target(), newRank * edge.edgeValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
new file mode 100644
index 0000000..34c9ad8
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRankCountingVertices {
+
+ private static final double BETA = 0.85;
+
+
+ public static void main(String[] args) throws Exception {
+ final int NUM_VERTICES = 100;
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // a list of vertices
+ DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
+
+ // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+ DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
+ .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+ int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
+ for (int i = 0; i < numOutEdges; i++) {
+ long target = (long) (Math.random() * NUM_VERTICES) + 1;
+ out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+ }
+ }
+ });
+
+ // ---------- start of the algorithm ---------------
+
+ // count the number of vertices
+ DataSet<Long> count = vertices
+ .map(new MapFunction<Long, Long>() {
+ public Long map(Long value) {
+ return 1L;
+ }
+ })
+ .reduce(new ReduceFunction<Long>() {
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ });
+
+ // enumerate some sample edges and assign an initial uniform probability (rank)
+ DataSet<Tuple2<Long, Double>> intialRanks = vertices
+ .map(new MapFunction<Long, Tuple2<Long, Double>>() {
+
+ private long numVertices;
+
+ @Override
+ public void open(Configuration parameters) {
+ numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
+ }
+
+ public Tuple2<Long, Double> map(Long value) {
+ return new Tuple2<Long, Double>(value, 1.0/numVertices);
+ }
+ }).withBroadcastSet(count, "count");
+
+
+ VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
+ new VertexRankUpdater(BETA), new RankMessenger(), 20);
+ iteration.addBroadcastSetForUpdateFunction("count", count);
+
+
+ DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
+
+ result.print();
+ env.execute("Spargel PageRank");
+ }
+
+ /**
+ * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+ * and then applying the dampening formula.
+ */
+ public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+ private final double beta;
+ private long numVertices;
+
+ public VertexRankUpdater(double beta) {
+ this.beta = beta;
+ }
+
+ @Override
+ public void preSuperstep() {
+ numVertices = this.<Long>getBroadcastSet("count").iterator().next();
+ }
+
+ @Override
+ public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+ double rankSum = 0.0;
+ for (double msg : inMessages) {
+ rankSum += msg;
+ }
+
+ // apply the dampening factor / random jump
+ double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+ setNewVertexValue(newRank);
+ }
+ }
+
+ /**
+ * Distributes the rank of a vertex among all target vertices according to the transition probability,
+ * which is associated with an edge as the edge value.
+ */
+ public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Long vertexId, Double newRank) {
+ for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+ sendMessageTo(edge.target(), newRank * edge.edgeValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
new file mode 100644
index 0000000..ab29471
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Value;
+
+
+public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
+
+ private VertexKey target;
+ private EdgeValue edgeValue;
+
+ void set(VertexKey target, EdgeValue edgeValue) {
+ this.target = target;
+ this.edgeValue = edgeValue;
+ }
+
+ public VertexKey target() {
+ return target;
+ }
+
+ public EdgeValue edgeValue() {
+ return edgeValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
new file mode 100644
index 0000000..25ad748
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.util.Iterator;
+
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+
+public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
+
+ private final Message instance;
+ private Iterator<Record> source;
+
+ public MessageIterator(Message instance) {
+ this.instance = instance;
+ }
+
+ public final void setSource(Iterator<Record> source) {
+ this.source = source;
+ }
+
+ @Override
+ public final boolean hasNext() {
+ return this.source.hasNext();
+ }
+
+ @Override
+ public final Message next() {
+ this.source.next().getFieldInto(1, this.instance);
+ return this.instance;
+ }
+
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Message> iterator() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
new file mode 100644
index 0000000..026b366
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+
+ public void setup(Configuration config) throws Exception {}
+
+ public void preSuperstep() throws Exception {}
+
+ public void postSuperstep() throws Exception {}
+
+
+ public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+ }
+
+ edgesUsed = true;
+ edgeIter.set(edges);
+ return edgeIter;
+ }
+
+ public void sendMessageToAllNeighbors(Message m) {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+ }
+
+ edgesUsed = true;
+ while (edges.hasNext()) {
+ Record next = edges.next();
+ VertexKey k = next.getField(1, this.keyClass);
+ outValue.setField(0, k);
+ outValue.setField(1, m);
+ out.collect(outValue);
+ }
+ }
+
+ public void sendMessageTo(VertexKey target, Message m) {
+ outValue.setField(0, target);
+ outValue.setField(1, m);
+ out.collect(outValue);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public int getSuperstep() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods and state
+ // --------------------------------------------------------------------------------------------
+
+ private Record outValue;
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Iterator<Record> edges;
+
+ private Collector<Record> out;
+
+ private EdgesIterator<VertexKey, EdgeValue> edgeIter;
+
+ private Class<VertexKey> keyClass;
+
+ private boolean edgesUsed;
+
+
+ @SuppressWarnings("unchecked")
+ void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
+ this.runtimeContext = context;
+ this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
+ this.outValue = new Record();
+ this.keyClass = (Class<VertexKey>) keyHolder.getClass();
+ }
+
+ void set(Iterator<Record> edges, Collector<Record> out) {
+ this.edges = edges;
+ this.out = out;
+ this.edgesUsed = false;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
+
+ private Iterator<Record> input;
+ private VertexKey keyHolder;
+ private EdgeValue edgeValueHolder;
+
+ private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
+
+ EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
+ this.keyHolder = keyHolder;
+ this.edgeValueHolder = edgeValueHolder;
+ }
+
+ void set(Iterator<Record> input) {
+ this.input = input;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public Edge<VertexKey, EdgeValue> next() {
+ Record next = input.next();
+ next.getFieldInto(0, keyHolder);
+ next.getFieldInto(1, edgeValueHolder);
+ edge.set(keyHolder, edgeValueHolder);
+ return edge;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}