You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:01 UTC
[61/92] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index fcd0606..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
- @Override
- public Plan getPlan(String[] args) {
- /*
- * In this example we use the constructor where the url contains all the settings that are needed.
- * You could also use the default constructor and deliver a Configuration with all the needed settings.
- * You also could set the settings to the source-instance.
- */
- GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>(
- new JDBCInputFormat(
- "org.apache.derby.jdbc.EmbeddedDriver",
- "jdbc:derby:memory:ebookshop",
- "select * from books"),
- "Data Source");
-
- GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
- JDBCOutputFormat.configureOutputFormat(sink)
- .setDriver("org.apache.derby.jdbc.EmbeddedDriver")
- .setUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .setClass(IntValue.class)
- .setClass(StringValue.class)
- .setClass(StringValue.class)
- .setClass(FloatValue.class)
- .setClass(IntValue.class);
-
- sink.addInput(source);
- return new Plan(sink, "JDBC Example Job");
- }
-
- @Override
- public String getDescription() {
- return "Parameter:";
- }
-
- /*
- * To run this example, you need the apache derby code in your classpath!
- */
- public static void main(String[] args) throws Exception {
-
- prepareTestDb();
- JDBCExample tut = new JDBCExample();
- JobExecutionResult res = LocalExecutor.execute(tut, args);
- System.out.println("runtime: " + res.getNetRuntime());
-
- System.exit(0);
- }
-
- private static void prepareTestDb() throws Exception {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- Connection conn = DriverManager.getConnection(dbURL);
-
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- conn.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 5816fa8..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
- JDBCInputFormat jdbcInputFormat;
-
- static Connection conn;
-
- static final Object[][] dbData = {
- {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
- {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
- {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
- {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
- {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- prepareDerbyDatabase();
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
- System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTable();
- insertDataToSQLTable();
- conn.close();
- }
-
- private static void createTable() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTable() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
- @AfterClass
- public static void tearDownClass() {
- cleanUpDerbyDatabases();
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @After
- public void tearDown() {
- jdbcInputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("select * from books")
- .finish();
- jdbcInputFormat.open(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery("select * from books")
- .finish();
- jdbcInputFormat.open(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("iamnotsql")
- .finish();
- jdbcInputFormat.open(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompleteConfiguration() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setQuery("select * from books")
- .finish();
- }
-
- @Test(expected = IOException.class)
- public void testIncompatibleTuple() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("select * from books")
- .finish();
- jdbcInputFormat.open(null);
- jdbcInputFormat.nextRecord(new Tuple2());
- }
-
- @Test
- public void testJDBCInputFormat() throws IOException {
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("select * from books")
- .finish();
- jdbcInputFormat.open(null);
- Tuple5 tuple = new Tuple5();
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(tuple);
- Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
- Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
- Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
- Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
- Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
-
- for (int x = 0; x < 5; x++) {
- Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
- }
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c1c899e..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
- private JDBCInputFormat jdbcInputFormat;
- private JDBCOutputFormat jdbcOutputFormat;
-
- private static Connection conn;
-
- static final Object[][] dbData = {
- {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
- {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
- {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
- {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
- {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
- @BeforeClass
- public static void setUpClass() throws SQLException {
- try {
- System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- prepareDerbyDatabase();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTable("books");
- createTable("newbooks");
- insertDataToSQLTables();
- conn.close();
- }
-
- private static void createTable(String tableName) throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
- sqlQueryBuilder.append(tableName);
- sqlQueryBuilder.append(" (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
- @AfterClass
- public static void tearDownClass() {
- cleanUpDerbyDatabases();
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.executeUpdate("DROP TABLE newbooks");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @After
- public void tearDown() {
- jdbcOutputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("iamnotsql")
- .finish();
- jdbcOutputFormat.open(0, 1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompleteConfiguration() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- }
-
- @Test(expected = IOException.class)
- public void testIncompatibleTuple() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- jdbcOutputFormat.open(0, 1);
-
- Tuple3 tuple3 = new Tuple3();
- tuple3.setField(4, 0);
- tuple3.setField("hi", 1);
- tuple3.setField(4.4, 2);
-
- jdbcOutputFormat.writeRecord(tuple3);
- jdbcOutputFormat.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIncompatibleTypes() throws IOException {
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- jdbcOutputFormat.open(0, 1);
-
- Tuple5 tuple5 = new Tuple5();
- tuple5.setField(4, 0);
- tuple5.setField("hello", 1);
- tuple5.setField("world", 2);
- tuple5.setField(0.99, 3);
- tuple5.setField("imthewrongtype", 4);
-
- jdbcOutputFormat.writeRecord(tuple5);
- jdbcOutputFormat.close();
- }
-
- @Test
- public void testJDBCOutputFormat() throws IOException {
- String sourceTable = "books";
- String targetTable = "newbooks";
- String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
- String dbUrl = "jdbc:derby:memory:ebookshop";
-
- jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDBUrl(dbUrl)
- .setDrivername(driverPath)
- .setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)")
- .finish();
- jdbcOutputFormat.open(0, 1);
-
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(driverPath)
- .setDBUrl(dbUrl)
- .setQuery("select * from " + sourceTable)
- .finish();
- jdbcInputFormat.open(null);
-
- Tuple5 tuple = new Tuple5();
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(tuple);
- jdbcOutputFormat.writeRecord(tuple);
- }
-
- jdbcOutputFormat.close();
- jdbcInputFormat.close();
-
- jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(driverPath)
- .setDBUrl(dbUrl)
- .setQuery("select * from " + targetTable)
- .finish();
- jdbcInputFormat.open(null);
-
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(tuple);
- Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
- Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
- Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
- Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
- Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
-
- for (int x = 0; x < 5; x++) {
- Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
- }
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- jdbcInputFormat.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 3032728..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
- public static final OutputStream DEV_NULL = new OutputStream() {
- public void write(int b) {}
- };
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 1bafb42..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
-* http://www.apache.org/licenses/LICENSE-2.0
- *
-* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
- JDBCInputFormat jdbcInputFormat;
- Configuration config;
- static Connection conn;
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- prepareDerbyDatabase();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException {
- System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- createConnection(dbURL);
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- /*
- Loads JDBC derby driver ; creates(if necessary) and populates database.
- */
- private static void createConnection(String dbURL) {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTable();
- insertDataToSQLTables();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTable() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcInputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidConnection() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDBType() {
- jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUnsupportedSQLType() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
- jdbcInputFormat.configure(null);
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatNext() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatEnd() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.reachedEnd();
- }
-
- @Test
- public void testJDBCInputFormat() throws IOException {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.configure(null);
- Record record = new Record();
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- cleanUpDerbyDatabases();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index 10ca85d..0000000
--- a/flink-addons/jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import junit.framework.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
- private JDBCInputFormat jdbcInputFormat;
- private JDBCOutputFormat jdbcOutputFormat;
-
- private static Connection conn;
-
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- prepareDerbyInputDatabase();
- prepareDerbyOutputDatabase();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.executeUpdate("DROP TABLE newbooks");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableBooks();
- insertDataToSQLTables();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableNewBooks();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTableBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void createTableNewBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcOutputFormat = null;
- cleanUpDerbyDatabases();
- }
-
- @Test
- public void testJDBCOutputFormat() throws IOException {
- String sourceTable = "books";
- String targetTable = "newbooks";
- String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
- String dbUrl = "jdbc:derby:memory:ebookshop";
-
- Configuration cfg = new Configuration();
- cfg.setString("driver", driverPath);
- cfg.setString("url", dbUrl);
- cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
- cfg.setInteger("fields", 5);
- cfg.setClass("type0", IntValue.class);
- cfg.setClass("type1", StringValue.class);
- cfg.setClass("type2", StringValue.class);
- cfg.setClass("type3", FloatValue.class);
- cfg.setClass("type4", IntValue.class);
-
- jdbcOutputFormat = new JDBCOutputFormat();
- jdbcOutputFormat.configure(cfg);
- jdbcOutputFormat.open(0,1);
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + sourceTable);
- jdbcInputFormat.configure(null);
-
- Record record = new Record();
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- jdbcOutputFormat.writeRecord(record);
- }
-
- jdbcOutputFormat.close();
- jdbcInputFormat.close();
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + targetTable);
- jdbcInputFormat.configure(null);
-
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- jdbcInputFormat.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index d9fcfb0..e10df81 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -29,10 +29,10 @@
<packaging>pom</packaging>
<modules>
- <module>avro</module>
- <module>jdbc</module>
- <module>spargel</module>
- <module>hadoop-compatibility</module>
+ <module>flink-avro</module>
+ <module>flink-jdbc</module>
+ <module>flink-spargel</module>
+ <module>flink-hadoop-compatibility</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
@@ -58,7 +58,7 @@
</property>
</activation>
<modules>
- <module>hbase</module>
+ <module>flink-hbase</module>
</modules>
</profile>
<profile>
@@ -70,7 +70,7 @@
</property>
</activation>
<modules>
- <module>yarn</module>
+ <module>flink-yarn</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/pom.xml b/flink-addons/spargel/pom.xml
deleted file mode 100644
index 150cb37..0000000
--- a/flink-addons/spargel/pom.xml
+++ /dev/null
@@ -1,55 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>flink-addons</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>spargel</artifactId>
- <name>spargel</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
deleted file mode 100644
index 3e1930c..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- private transient Iterator<Tuple2<?, Message>> source;
-
-
- final void setSource(Iterator<Tuple2<?, Message>> source) {
- this.source = source;
- }
-
- @Override
- public final boolean hasNext() {
- return this.source.hasNext();
- }
-
- @Override
- public final Message next() {
- return this.source.next().f1;
- }
-
- @Override
- public final void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<Message> iterator() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
deleted file mode 100644
index 1b5cbde..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Public API Methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * This method is invoked once per superstep for each vertex that was changed in that superstep.
- * It needs to produce the messages that will be received by vertices in the next superstep.
- *
- * @param vertexKey The key of the vertex that was changed.
- * @param vertexValue The value (state) of the vertex that was changed.
- *
- * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
- */
- public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-
- /**
- * This method is executed one per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() throws Exception {}
-
- /**
- * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() throws Exception {}
-
-
- /**
- * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
- * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
- *
- * @return An iterator with all outgoing edges.
- */
- @SuppressWarnings("unchecked")
- public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
- }
- edgesUsed = true;
-
- if (this.edgeWithValueIter != null) {
- this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
- return this.edgeWithValueIter;
- } else {
- this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
- return this.edgeNoValueIter;
- }
- }
-
- /**
- * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
- * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
- *
- * @param m The message to send.
- */
- public void sendMessageToAllNeighbors(Message m) {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
- }
-
- edgesUsed = true;
-
- outValue.f1 = m;
-
- while (edges.hasNext()) {
- Tuple next = (Tuple) edges.next();
- VertexKey k = next.getField(1);
- outValue.f0 = k;
- out.collect(outValue);
- }
- }
-
- /**
- * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
- * the next superstep will cause an exception due to a non-deliverable message.
- *
- * @param target The key (id) of the target vertex to message.
- * @param m The message.
- */
- public void sendMessageTo(VertexKey target, Message m) {
- outValue.f0 = target;
- outValue.f1 = m;
- out.collect(outValue);
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function. They can be registered via
- * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // internal methods and state
- // --------------------------------------------------------------------------------------------
-
- private Tuple2<VertexKey, Message> outValue;
-
- private IterationRuntimeContext runtimeContext;
-
- private Iterator<?> edges;
-
- private Collector<Tuple2<VertexKey, Message>> out;
-
- private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-
- private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
-
- private boolean edgesUsed;
-
-
- void init(IterationRuntimeContext context, boolean hasEdgeValue) {
- this.runtimeContext = context;
- this.outValue = new Tuple2<VertexKey, Message>();
-
- if (hasEdgeValue) {
- this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
- } else {
- this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
- }
- }
-
- void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
- this.edges = edges;
- this.out = out;
- this.edgesUsed = false;
- }
-
-
-
- private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
- implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
- {
- private Iterator<Tuple2<VertexKey, VertexKey>> input;
-
- private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-
-
- void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
- this.input = input;
- }
-
- @Override
- public boolean hasNext() {
- return input.hasNext();
- }
-
- @Override
- public OutgoingEdge<VertexKey, EdgeValue> next() {
- Tuple2<VertexKey, VertexKey> next = input.next();
- edge.set(next.f1, null);
- return edge;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
- return this;
- }
- }
-
-
- private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
- implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
- {
- private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
-
- private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-
- void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
- this.input = input;
- }
-
- @Override
- public boolean hasNext() {
- return input.hasNext();
- }
-
- @Override
- public OutgoingEdge<VertexKey, EdgeValue> next() {
- Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
- edge.set(next.f1, next.f2);
- return edge;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- @Override
- public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
deleted file mode 100644
index aef9d0b..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a distance), if the
- * graph algorithm was initialized with the
- * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
- * value, this type may be arbitrary.
- */
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private VertexKey target;
-
- private EdgeValue edgeValue;
-
- void set(VertexKey target, EdgeValue edgeValue) {
- this.target = target;
- this.edgeValue = edgeValue;
- }
-
- /**
- * Gets the target vertex id.
- *
- * @return The target vertex id.
- */
- public VertexKey target() {
- return target;
- }
-
- /**
- * Gets the value associated with the edge. The value may be null if the iteration was initialized with
- * an edge data set without edge values.
- * Typical examples of edge values are weights or distances of the path represented by the edge.
- *
- * @return The value associated with the edge.
- */
- public EdgeValue edgeValue() {
- return edgeValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
deleted file mode 100644
index bb84cea..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.Validate;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.Collector;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- * the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- * considered updated.</li>
- * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- * edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- * Vertex-centric graph iterations are instantiated by the
- * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
- * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
- * the graph's edges are carrying values.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
- implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-{
- private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-
- private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-
- private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-
- private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
-
- private final Map<String, Aggregator<?>> aggregators;
-
- private final int maximumNumberOfIterations;
-
- private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-
- private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-
- private final TypeInformation<Message> messageType;
-
- private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
-
- private String name;
-
- private int parallelism = -1;
-
- // ----------------------------------------------------------------------------------
-
- private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
- MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
- DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
- int maximumNumberOfIterations)
- {
- Validate.notNull(uf);
- Validate.notNull(mf);
- Validate.notNull(edgesWithoutValue);
- Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
- // check that the edges are actually a valid tuple set of vertex key types
- TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
- Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
-
- TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
- Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
- && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
- "Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-
- this.updateFunction = uf;
- this.messagingFunction = mf;
- this.edgesWithoutValue = edgesWithoutValue;
- this.edgesWithValue = null;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- this.aggregators = new HashMap<String, Aggregator<?>>();
-
- this.messageType = getMessageType(mf);
- }
-
- private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
- MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
- DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
- int maximumNumberOfIterations,
- boolean edgeHasValueMarker)
- {
- Validate.notNull(uf);
- Validate.notNull(mf);
- Validate.notNull(edgesWithValue);
- Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
- // check that the edges are actually a valid tuple set of vertex key types
- TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
- Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
-
- TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
- Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
- && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
- "The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-
- Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
- this.updateFunction = uf;
- this.messagingFunction = mf;
- this.edgesWithoutValue = null;
- this.edgesWithValue = edgesWithValue;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- this.aggregators = new HashMap<String, Aggregator<?>>();
-
- this.messageType = getMessageType(mf);
- }
-
- private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
- return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
- }
-
- /**
- * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
- * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
- * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
- *
- * @param name The name of the aggregator, used to retrieve it and its aggregates during execution.
- * @param aggregator The aggregator.
- */
- public void registerAggregator(String name, Aggregator<?> aggregator) {
- this.aggregators.put(name, aggregator);
- }
-
- /**
- * Adds a data set as a broadcast set to the messaging function.
- *
- * @param name The name under which the broadcast data is available in the messaging function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
- this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Adds a data set as a broadcast set to the vertex update function.
- *
- * @param name The name under which the broadcast data is available in the vertex update function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
- this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
- *
- * @param name The name for the iteration.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets the name from this vertex-centric iteration.
- *
- * @return The name of the iteration.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the degree of parallelism for the iteration.
- *
- * @param parallelism The degree of parallelism.
- */
- public void setParallelism(int parallelism) {
- Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
- this.parallelism = parallelism;
- }
-
- /**
- * Gets the iteration's degree of parallelism.
- *
- * @return The iterations parallelism, or -1, if not set.
- */
- public int getParallelism() {
- return parallelism;
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom Operator behavior
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the input data set for this operator. In the case of this operator this input data set represents
- * the set of vertices with their initial state.
- *
- * @param inputData The input data set, which in the case of this operator represents the set of
- * vertices with their initial state.
- *
- * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
- */
- @Override
- public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
- // sanity check that we really have two tuples
- TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
- Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
-
- // check that the key type here is the same as for the edges
- TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
- TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
- TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
-
- Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
- "must be the same data type as the first fields of the edge data set (the source vertex id). " +
- "Here, the key type for the vertex ids is '%s' and the key type for the edges is '%s'.", keyType, edgeKeyType);
-
- this.initialVertices = inputData;
- }
-
- /**
- * Creates the operator that represents this vertex-centric graph computation.
- *
- * @return The operator that represents this vertex-centric graph computation.
- */
- @Override
- public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
- if (this.initialVertices == null) {
- throw new IllegalStateException("The input data set has not been set.");
- }
-
- // prepare some type information
- TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
- TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
-
- // set up the iteration operator
- final String name = (this.name != null) ? this.name :
- "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
- final int[] zeroKeyPos = new int[] {0};
-
- final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
- this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
- iteration.name(name);
- iteration.parallelism(parallelism);
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
- iteration.registerAggregator(entry.getKey(), entry.getValue());
- }
-
- // build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
- if (edgesWithoutValue != null) {
- MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
- messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
- }
- else {
- MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
- messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
- }
-
- // configure coGroup message function with name and broadcast variables
- messages = messages.name("Messaging");
- for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
- messages = messages.withBroadcastSet(e.f1, e.f0);
- }
-
- VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-
- // build the update function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
- messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
- // configure coGroup update function with name and broadcast variables
- updates = updates.name("Vertex State Updates");
- for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
- updates = updates.withBroadcastSet(e.f1, e.f0);
- }
-
- // let the operator know that we preserve the key field
- updates.withConstantSetFirst("0").withConstantSetSecond("0");
-
- return iteration.closeWith(updates, updates);
-
- }
-
- // --------------------------------------------------------------------------------------------
- // Constructor builders to avoid signature conflicts with generic type erasure
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
- *
- * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
- * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
- * @param messagingFunction The function that turns changed vertex states into messages along the edges.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- *
- * @return An in stance of the vertex-centric graph computation operator.
- */
- public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
- VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
- DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
- VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
- MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
- int maximumNumberOfIterations)
- {
- @SuppressWarnings("unchecked")
- MessagingFunction<VertexKey, VertexValue, Message, Object> tmf =
- (MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-
- return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
- }
-
- /**
- * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
- * a weight or distance).
- *
- * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
- * @param uf The function that updates the state of the vertices from the incoming messages.
- * @param mf The function that turns changed vertex states into messages along the edges.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- *
- * @return An in stance of the vertex-centric graph computation operator.
- */
- public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
- VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
- DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
- VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
- MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
- int maximumNumberOfIterations)
- {
- return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
- }
-
- // --------------------------------------------------------------------------------------------
- // Wrapping UDFs
- // --------------------------------------------------------------------------------------------
-
- private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
- extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
- implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
- {
- private static final long serialVersionUID = 1L;
-
- private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
-
- private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-
- private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
-
-
- private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
- TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
- {
- this.vertexUpdateFunction = vertexUpdateFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
- Collector<Tuple2<VertexKey, VertexValue>> out)
- throws Exception
- {
- if (vertex.hasNext()) {
- Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
-
- @SuppressWarnings("unchecked")
- Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
- messageIter.setSource(downcastIter);
-
- vertexUpdateFunction.setOutput(vertexState, out);
- vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
- } else {
- if (messages.hasNext()) {
- String message = "Target vertex does not exist!.";
- try {
- Tuple2<VertexKey, Message> next = messages.next();
- message = "Target vertex '" + next.f0 + "' does not exist!.";
- } catch (Throwable t) {}
- throw new Exception(message);
- } else {
- throw new Exception();
- }
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.vertexUpdateFunction.init(getIterationRuntimeContext());
- }
- this.vertexUpdateFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.vertexUpdateFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
- return this.resultType;
- }
- }
-
- /*
- * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
- */
- private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
- extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
- implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
- {
- private static final long serialVersionUID = 1L;
-
- private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
-
- private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-
-
- private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
- TypeInformation<Tuple2<VertexKey, Message>> resultType)
- {
- this.messagingFunction = messagingFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
- Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
- throws Exception
- {
- if (state.hasNext()) {
- Tuple2<VertexKey, VertexValue> newVertexState = state.next();
- messagingFunction.set((Iterator<?>) edges, out);
- messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.messagingFunction.init(getIterationRuntimeContext(), false);
- }
-
- this.messagingFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.messagingFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
- return this.resultType;
- }
- }
-
- /*
- * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
- */
- private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
- extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
- implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
- {
- private static final long serialVersionUID = 1L;
-
- private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-
- private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-
-
- private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
- TypeInformation<Tuple2<VertexKey, Message>> resultType)
- {
- this.messagingFunction = messagingFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
- Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
- throws Exception
- {
- if (state.hasNext()) {
- Tuple2<VertexKey, VertexValue> newVertexState = state.next();
- messagingFunction.set((Iterator<?>) edges, out);
- messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.messagingFunction.init(getIterationRuntimeContext(), true);
- }
-
- this.messagingFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.messagingFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
- return this.resultType;
- }
- }
-}