You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:01 UTC

[61/92] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index fcd0606..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
-	@Override
-	public Plan getPlan(String[] args) {
-		/*
-		 * In this example we use the constructor where the url contains all the settings that are needed.
-		 * You could also use the default constructor and deliver a Configuration with all the needed settings.
-		 * You also could set the settings to the source-instance.
-		 */
-		GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>(
-				new JDBCInputFormat(
-						"org.apache.derby.jdbc.EmbeddedDriver",
-						"jdbc:derby:memory:ebookshop",
-						"select * from books"),
-				"Data Source");
-
-		GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
-		JDBCOutputFormat.configureOutputFormat(sink)
-				.setDriver("org.apache.derby.jdbc.EmbeddedDriver")
-				.setUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.setClass(IntValue.class)
-				.setClass(StringValue.class)
-				.setClass(StringValue.class)
-				.setClass(FloatValue.class)
-				.setClass(IntValue.class);
-
-		sink.addInput(source);
-		return new Plan(sink, "JDBC Example Job");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameter:";
-	}
-
-	/*
-	 * To run this example, you need the apache derby code in your classpath!
-	 */
-	public static void main(String[] args) throws Exception {
-
-		prepareTestDb();
-		JDBCExample tut = new JDBCExample();
-		JobExecutionResult res = LocalExecutor.execute(tut, args);
-		System.out.println("runtime: " + res.getNetRuntime());
-
-		System.exit(0);
-	}
-
-	private static void prepareTestDb() throws Exception {
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		Connection conn = DriverManager.getConnection(dbURL);
-
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-		
-		conn.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 5816fa8..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
-	JDBCInputFormat jdbcInputFormat;
-
-	static Connection conn;
-
-	static final Object[][] dbData = {
-		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
-		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			prepareDerbyDatabase();
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
-		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		conn = DriverManager.getConnection(dbURL);
-		createTable();
-		insertDataToSQLTable();
-		conn.close();
-	}
-
-	private static void createTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void tearDownClass() {
-		cleanUpDerbyDatabases();
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-			conn = DriverManager.getConnection(dbURL);
-			Statement stat = conn.createStatement();
-			stat.executeUpdate("DROP TABLE books");
-			stat.close();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	@After
-	public void tearDown() {
-		jdbcInputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDriver() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
-				.finish();
-		jdbcInputFormat.open(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidURL() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery("select * from books")
-				.finish();
-		jdbcInputFormat.open(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("iamnotsql")
-				.finish();
-		jdbcInputFormat.open(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompleteConfiguration() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setQuery("select * from books")
-				.finish();
-	}
-
-	@Test(expected = IOException.class)
-	public void testIncompatibleTuple() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
-				.finish();
-		jdbcInputFormat.open(null);
-		jdbcInputFormat.nextRecord(new Tuple2());
-	}
-
-	@Test
-	public void testJDBCInputFormat() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
-				.finish();
-		jdbcInputFormat.open(null);
-		Tuple5 tuple = new Tuple5();
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
-			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
-			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
-			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
-			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
-
-			for (int x = 0; x < 5; x++) {
-				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
-			}
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c1c899e..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
-	private JDBCInputFormat jdbcInputFormat;
-	private JDBCOutputFormat jdbcOutputFormat;
-
-	private static Connection conn;
-
-	static final Object[][] dbData = {
-		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
-		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-	@BeforeClass
-	public static void setUpClass() throws SQLException {
-		try {
-			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-			prepareDerbyDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		conn = DriverManager.getConnection(dbURL);
-		createTable("books");
-		createTable("newbooks");
-		insertDataToSQLTables();
-		conn.close();
-	}
-
-	private static void createTable(String tableName) throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
-		sqlQueryBuilder.append(tableName);
-		sqlQueryBuilder.append(" (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void tearDownClass() {
-		cleanUpDerbyDatabases();
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-			conn = DriverManager.getConnection(dbURL);
-			Statement stat = conn.createStatement();
-			stat.executeUpdate("DROP TABLE books");
-			stat.executeUpdate("DROP TABLE newbooks");
-			stat.close();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	@After
-	public void tearDown() {
-		jdbcOutputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDriver() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidURL() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("iamnotsql")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompleteConfiguration() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-	}
-
-	@Test(expected = IOException.class)
-	public void testIncompatibleTuple() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-
-		Tuple3 tuple3 = new Tuple3();
-		tuple3.setField(4, 0);
-		tuple3.setField("hi", 1);
-		tuple3.setField(4.4, 2);
-
-		jdbcOutputFormat.writeRecord(tuple3);
-		jdbcOutputFormat.close();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testIncompatibleTypes() throws IOException {
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-
-		Tuple5 tuple5 = new Tuple5();
-		tuple5.setField(4, 0);
-		tuple5.setField("hello", 1);
-		tuple5.setField("world", 2);
-		tuple5.setField(0.99, 3);
-		tuple5.setField("imthewrongtype", 4);
-
-		jdbcOutputFormat.writeRecord(tuple5);
-		jdbcOutputFormat.close();
-	}
-
-	@Test
-	public void testJDBCOutputFormat() throws IOException {
-		String sourceTable = "books";
-		String targetTable = "newbooks";
-		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-		String dbUrl = "jdbc:derby:memory:ebookshop";
-
-		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDBUrl(dbUrl)
-				.setDrivername(driverPath)
-				.setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)")
-				.finish();
-		jdbcOutputFormat.open(0, 1);
-
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(driverPath)
-				.setDBUrl(dbUrl)
-				.setQuery("select * from " + sourceTable)
-				.finish();
-		jdbcInputFormat.open(null);
-
-		Tuple5 tuple = new Tuple5();
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			jdbcOutputFormat.writeRecord(tuple);
-		}
-
-		jdbcOutputFormat.close();
-		jdbcInputFormat.close();
-
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(driverPath)
-				.setDBUrl(dbUrl)
-				.setQuery("select * from " + targetTable)
-				.finish();
-		jdbcInputFormat.open(null);
-
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
-			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
-			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
-			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
-			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
-
-			for (int x = 0; x < 5; x++) {
-				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
-			}
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-
-		jdbcInputFormat.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 3032728..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
-	public static final OutputStream DEV_NULL = new OutputStream() {
-		public void write(int b) {}
-	};
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 1bafb42..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
-* http://www.apache.org/licenses/LICENSE-2.0
- * 
-* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
-	JDBCInputFormat jdbcInputFormat;
-	Configuration config;
-	static Connection conn;
-	static final Value[][] dbData = {
-		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
-		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			prepareDerbyDatabase();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException {
-		System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		createConnection(dbURL);
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-				String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-				Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-				conn = DriverManager.getConnection(dbURL);
-				Statement stat = conn.createStatement();
-				stat.executeUpdate("DROP TABLE books");
-				stat.close();
-				conn.close();
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail();
-			} 
-	}
-	
-	/*
-	 Loads JDBC derby driver ; creates(if necessary) and populates database.
-	 */
-	private static void createConnection(String dbURL) {
-		try {
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTable();
-			insertDataToSQLTables();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void createTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-
-	@After
-	public void tearDown() {
-		jdbcInputFormat = null;
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidConnection() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidQuery() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testInvalidDBType() {
-		jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
-		jdbcInputFormat.configure(null);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUnsupportedSQLType() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
-		jdbcInputFormat.configure(null);
-		jdbcInputFormat.nextRecord(new Record());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testNotConfiguredFormatNext() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.nextRecord(new Record());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testNotConfiguredFormatEnd() {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.reachedEnd();
-	}
-
-	@Test
-	public void testJDBCInputFormat() throws IOException {
-		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
-		jdbcInputFormat.configure(null);
-		Record record = new Record();
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			Assert.assertEquals(5, record.getNumFields());
-			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
-			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
-			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
-			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
-			int[] pos = {0, 1, 2, 3, 4};
-			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
-			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-		
-		cleanUpDerbyDatabases();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index 10ca85d..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
-	private JDBCInputFormat jdbcInputFormat;
-	private JDBCOutputFormat jdbcOutputFormat;
-
-	private static Connection conn;
-
-	static final Value[][] dbData = {
-		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
-		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-			prepareDerbyInputDatabase();
-			prepareDerbyOutputDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		 try {
-			 String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			 conn = DriverManager.getConnection(dbURL);
-			 Statement stat = conn.createStatement();
-			 stat.executeUpdate("DROP TABLE books");
-			 stat.executeUpdate("DROP TABLE newbooks");
-			 stat.close();
-			 conn.close();
-		 } catch (Exception e) {
-			 e.printStackTrace();
-			 Assert.fail();
-		 } 
-	}
-	
-	private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableBooks();
-			insertDataToSQLTables();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableNewBooks();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void createTableBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void createTableNewBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-
-	@After
-	public void tearDown() {
-		jdbcOutputFormat = null;
-		cleanUpDerbyDatabases();
-	}
-
-	@Test
-	public void testJDBCOutputFormat() throws IOException {
-		String sourceTable = "books";
-		String targetTable = "newbooks";
-		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-		String dbUrl = "jdbc:derby:memory:ebookshop";
-
-		Configuration cfg = new Configuration();
-		cfg.setString("driver", driverPath);
-		cfg.setString("url", dbUrl);
-		cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
-		cfg.setInteger("fields", 5);
-		cfg.setClass("type0", IntValue.class);
-		cfg.setClass("type1", StringValue.class);
-		cfg.setClass("type2", StringValue.class);
-		cfg.setClass("type3", FloatValue.class);
-		cfg.setClass("type4", IntValue.class);
-
-		jdbcOutputFormat = new JDBCOutputFormat();
-		jdbcOutputFormat.configure(cfg);
-		jdbcOutputFormat.open(0,1);
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + sourceTable);
-		jdbcInputFormat.configure(null);
-
-		Record record = new Record();
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			jdbcOutputFormat.writeRecord(record);
-		}
-
-		jdbcOutputFormat.close();
-		jdbcInputFormat.close();
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + targetTable);
-		jdbcInputFormat.configure(null);
-
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			Assert.assertEquals(5, record.getNumFields());
-			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
-			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
-			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
-			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
-			int[] pos = {0, 1, 2, 3, 4};
-			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
-			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-
-		jdbcInputFormat.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index d9fcfb0..e10df81 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -29,10 +29,10 @@
 	<packaging>pom</packaging>
 
 	<modules>
-		<module>avro</module>
-		<module>jdbc</module>
-		<module>spargel</module>
-		<module>hadoop-compatibility</module>
+		<module>flink-avro</module>
+		<module>flink-jdbc</module>
+		<module>flink-spargel</module>
+		<module>flink-hadoop-compatibility</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->
@@ -58,7 +58,7 @@
 				</property>
 			</activation>
 			<modules>
-				<module>hbase</module>
+				<module>flink-hbase</module>
 			</modules>
 		</profile>
 		<profile>
@@ -70,7 +70,7 @@
 				</property>
 			</activation>
 			<modules>
-				<module>yarn</module>
+				<module>flink-yarn</module>
 			</modules>
 		</profile>
 	</profiles>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/pom.xml b/flink-addons/spargel/pom.xml
deleted file mode 100644
index 150cb37..0000000
--- a/flink-addons/spargel/pom.xml
+++ /dev/null
@@ -1,55 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<artifactId>flink-addons</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>spargel</artifactId>
-	<name>spargel</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
deleted file mode 100644
index 3e1930c..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private transient Iterator<Tuple2<?, Message>> source;
-	
-	
-	final void setSource(Iterator<Tuple2<?, Message>> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		return this.source.next().f1;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
deleted file mode 100644
index 1b5cbde..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- * 
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
-	 * It needs to produce the messages that will be received by vertices in the next superstep.
-	 * 
-	 * @param vertexKey The key of the vertex that was changed.
-	 * @param vertexValue The value (state) of the vertex that was changed.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	
-	/**
-	 * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
-	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * 
-	 * @return An iterator with all outgoing edges.
-	 */
-	@SuppressWarnings("unchecked")
-	public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		edgesUsed = true;
-		
-		if (this.edgeWithValueIter != null) {
-			this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
-			return this.edgeWithValueIter;
-		} else {
-			this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
-			return this.edgeNoValueIter;
-		}
-	}
-	
-	/**
-	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
-	 * 
-	 * @param m The message to send.
-	 */
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		
-		edgesUsed = true;
-		
-		outValue.f1 = m;
-		
-		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
-			VertexKey k = next.getField(1);
-			outValue.f0 = k;
-			out.collect(outValue);
-		}
-	}
-	
-	/**
-	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
-	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
-	 * @param target The key (id) of the target vertex to message.
-	 * @param m The message.
-	 */
-	public void sendMessageTo(VertexKey target, Message m) {
-		outValue.f0 = target;
-		outValue.f1 = m;
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Tuple2<VertexKey, Message> outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<?> edges;
-	
-	private Collector<Tuple2<VertexKey, Message>> out;
-	
-	private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-	
-	private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
-	
-	private boolean edgesUsed;
-	
-	
-	void init(IterationRuntimeContext context, boolean hasEdgeValue) {
-		this.runtimeContext = context;
-		this.outValue = new Tuple2<VertexKey, Message>();
-		
-		if (hasEdgeValue) {
-			this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
-		} else {
-			this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
-		}
-	}
-	
-	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	
-	
-	private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple2<VertexKey, VertexKey>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		
-		void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple2<VertexKey, VertexKey> next = input.next();
-			edge.set(next.f1, null);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-	
-	
-	private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
-			edge.set(next.f1, next.f2);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
deleted file mode 100644
index aef9d0b..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a distance), if the
- * graph algorithm was initialized with the
- * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
- *                    value, this type may be arbitrary.
- */
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private VertexKey target;
-	
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	/**
-	 * Gets the target vertex id.
-	 * 
-	 * @return The target vertex id.
-	 */
-	public VertexKey target() {
-		return target;
-	}
-	
-	/**
-	 * Gets the value associated with the edge. The value may be null if the iteration was initialized with
-	 * an edge data set without edge values.
-	 * Typical examples of edge values are weights or distances of the path represented by the edge.
-	 *  
-	 * @return The value associated with the edge.
-	 */
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
deleted file mode 100644
index bb84cea..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.Validate;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.Collector;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- * Vertex-centric graph iterations are instantiated by the
- * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
- * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
- * the graph's edges are carrying values.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-{
-	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-	
-	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-	
-	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-	
-	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
-	
-	private final Map<String, Aggregator<?>> aggregators;
-	
-	private final int maximumNumberOfIterations;
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final TypeInformation<Message> messageType;
-	
-	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
-	
-	private String name;
-	
-	private int parallelism = -1;
-		
-	// ----------------------------------------------------------------------------------
-	
-	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-			int maximumNumberOfIterations)
-	{
-		Validate.notNull(uf);
-		Validate.notNull(mf);
-		Validate.notNull(edgesWithoutValue);
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = edgesWithoutValue;
-		this.edgesWithValue = null;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
-			int maximumNumberOfIterations,
-			boolean edgeHasValueMarker)
-	{
-		Validate.notNull(uf);
-		Validate.notNull(mf);
-		Validate.notNull(edgesWithValue);
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = null;
-		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
-		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
-	}
-	
-	/**
-	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
-	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
-	 * @param aggregator The aggregator.
-	 */
-	public void registerAggregator(String name, Aggregator<?> aggregator) {
-		this.aggregators.put(name, aggregator);
-	}
-	
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-	
-	/**
-	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
-	 * 
-	 * @param name The name for the iteration.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	/**
-	 * Gets the name from this vertex-centric iteration.
-	 * 
-	 * @return The name of the iteration.
-	 */
-	public String getName() {
-		return name;
-	}
-	
-	/**
-	 * Sets the degree of parallelism for the iteration.
-	 * 
-	 * @param parallelism The degree of parallelism.
-	 */
-	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the iteration's degree of parallelism.
-	 * 
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Operator behavior
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets the input data set for this operator. In the case of this operator this input data set represents
-	 * the set of vertices with their initial state.
-	 * 
-	 * @param inputData The input data set, which in the case of this operator represents the set of
-	 *                  vertices with their initial state.
-	 * 
-	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
-	 */
-	@Override
-	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
-		// sanity check that we really have two tuples
-		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
-		Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
-
-		// check that the key type here is the same as for the edges
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
-		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
-		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
-		
-		Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
-				"must be the same data type as the first fields of the edge data set (the source vertex id). " +
-				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", keyType, edgeKeyType);
-
-		this.initialVertices = inputData;
-	}
-	
-	/**
-	 * Creates the operator that represents this vertex-centric graph computation.
-	 * 
-	 * @return The operator that represents this vertex-centric graph computation.
-	 */
-	@Override
-	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
-		if (this.initialVertices == null) {
-			throw new IllegalStateException("The input data set has not been set.");
-		}
-		
-		// prepare some type information
-		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);		
-		
-		// set up the iteration operator
-		final String name = (this.name != null) ? this.name :
-			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
-		final int[] zeroKeyPos = new int[] {0};
-	
-		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
-			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
-		iteration.name(name);
-		iteration.parallelism(parallelism);
-		
-		// register all aggregators
-		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
-			iteration.registerAggregator(entry.getKey(), entry.getValue());
-		}
-		
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		if (edgesWithoutValue != null) {
-			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		else {
-			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
-			messages = messages.withBroadcastSet(e.f1, e.f0);
-		}
-		
-		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-		
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-		
-		// configure coGroup update function with name and broadcast variables
-		updates = updates.name("Vertex State Updates");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
-			updates = updates.withBroadcastSet(e.f1, e.f0);
-		}
-
-		// let the operator know that we preserve the key field
-		updates.withConstantSetFirst("0").withConstantSetSecond("0");
-		
-		return iteration.closeWith(updates, updates);
-		
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Constructor builders to avoid signature conflicts with generic type erasure
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
-	 * 
-	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
-	 * @param messagingFunction The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
-			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
-					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-						int maximumNumberOfIterations)
-	{
-		@SuppressWarnings("unchecked")
-		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
-								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-		
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
-	}
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
-	 * a weight or distance).
-	 * 
-	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param uf The function that updates the state of the vertices from the incoming messages.
-	 * @param mf The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * @param <EdgeValue> The type of the values that are associated with the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
-			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
-					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
-					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-					int maximumNumberOfIterations)
-	{
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
-
-		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-		
-		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
-		
-		
-		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
-		{
-			this.vertexUpdateFunction = vertexUpdateFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
-				Collector<Tuple2<VertexKey, VertexValue>> out)
-			throws Exception
-		{
-			if (vertex.hasNext()) {
-				Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
-				
-				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
-				messageIter.setSource(downcastIter);
-				
-				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
-			} else {
-				if (messages.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Tuple2<VertexKey, Message> next = messages.next();
-						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
-	 */
-	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
-				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			if (state.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
-				messagingFunction.set((Iterator<?>) edges, out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), false);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
-	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-		extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
-				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			if (state.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
-				messagingFunction.set((Iterator<?>) edges, out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), true);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-		
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-}