You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:16 UTC
[2/7] flink git commit: [FLINK-1962] Add Gelly Scala API
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
new file mode 100644
index 0000000..4b8f354
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.VertexToTuple2Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var resultPath: String = null
+ private var expectedResult: String = null
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+ @Rule
+ def getFolder(): TemporaryFolder = {
+ tempFolder;
+ }
+
+ @Before
+ @throws(classOf[Exception])
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ }
+
+ @After
+ @throws(classOf[Exception])
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testJoinWithVertexSet {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
+ VertexToTuple2Map[Long, Long]), new AddValuesMapper)
+ result.getVerticesAsTuple2().writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testJoinWithVertexSetSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
+ val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
+ (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
+ result.getVerticesAsTuple2().writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ }
+
+
+ final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+ @throws(classOf[Exception])
+ def map(tuple: (Long, Long)): Long = {
+ tuple._1 + tuple._2
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
new file mode 100644
index 0000000..7e5ad14
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var resultPath: String = null
+ private var expectedResult: String = null
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+ @Rule
+ def getFolder(): TemporaryFolder = {
+ tempFolder;
+ }
+
+ @Before
+ @throws(classOf[Exception])
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ }
+
+ @After
+ @throws(classOf[Exception])
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.mapEdges(new AddOneMapper)
+ .getEdgesAsTuple3().writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,2,13\n" +
+ "1,3,14\n" + "" +
+ "2,3,24\n" +
+ "3,4,35\n" +
+ "3,5,36\n" +
+ "4,5,46\n" +
+ "5,1,52\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValueSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.mapEdges(edge => edge.getValue + 1)
+ .getEdgesAsTuple3().writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,2,13\n" +
+ "1,3,14\n" + "" +
+ "2,3,24\n" +
+ "3,4,35\n" +
+ "3,5,36\n" +
+ "4,5,46\n" +
+ "5,1,52\n"
+ }
+
+ final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
+ @throws(classOf[Exception])
+ def map(edge: Edge[Long, Long]): Long = {
+ edge.getValue + 1
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
new file mode 100644
index 0000000..a22cfbd
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var resultPath: String = null
+ private var expectedResult: String = null
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+ @Rule
+ def getFolder(): TemporaryFolder = {
+ tempFolder;
+ }
+
+ @Before
+ @throws(classOf[Exception])
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ }
+
+ @After
+ @throws(classOf[Exception])
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.mapVertices(new AddOneMapper)
+ .getVerticesAsTuple2().writeAsCsv(resultPath)
+ env.execute
+
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,6\n";
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValueSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.mapVertices(vertex => vertex.getValue + 1)
+ .getVerticesAsTuple2().writeAsCsv(resultPath)
+ env.execute
+
+ expectedResult = "1,2\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,6\n";
+ }
+
+ final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
+ @throws(classOf[Exception])
+ def map(vertex: Vertex[Long, Long]): Long = {
+ vertex.getValue + 1
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
new file mode 100644
index 0000000..6ed383a
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ private var resultPath: String = null
+ private var expectedResult: String = null
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+ @Rule
+ def getFolder(): TemporaryFolder = {
+ tempFolder;
+ }
+
+ @Before
+ @throws(classOf[Exception])
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ }
+
+ @After
+ @throws(classOf[Exception])
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAllNeighborsWithValueGreaterThanFour {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+ EdgeDirection.ALL)
+ result.writeAsCsv(resultPath)
+ env.execute
+
+
+ expectedResult = "5,1\n" + "5,3\n" + "5,4"
+ }
+
+
+ @Test
+ @throws(classOf[Exception])
+ def testAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+ result.writeAsCsv(resultPath)
+ env.execute
+
+
+ expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" +
+ "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testLowestWeightOutNeighborNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testLowestWeightInNeighborNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMinWeightNeighborNoValue, EdgeDirection.IN)
+ verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testMaxWeightAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
+ SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
+ verticesWithMaxEdgeWeight.writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"
+ }
+
+ final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
+ Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
+ Collector[(Long, Long)]): Unit = {
+ for (edge <- edges) {
+ if (v.getValue > 4) {
+ if (v.getId == edge.getTarget) {
+ out.collect((v.getId, edge.getSource))
+ }
+ else {
+ out.collect((v.getId, edge.getTarget))
+ }
+ }
+ }
+ }
+ }
+
+ final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
+ (Long, Long)]) {
+ for (edge <- edges) {
+ if (edge._1.equals(edge._2.getTarget)) {
+ out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
+ }
+ else {
+ out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
+ }
+ }
+ }
+ }
+
+ final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+ override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+ Math.min(firstEdgeValue, secondEdgeValue)
+ }
+ }
+
+ final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
+ override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
+ Math.max(firstEdgeValue, secondEdgeValue)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
new file mode 100644
index 0000000..52e6d7a
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
+import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ private var resultPath: String = null
+ private var expectedResult: String = null
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+ @Rule
+ def getFolder(): TemporaryFolder = {
+ tempFolder;
+ }
+
+ @Before
+ @throws(classOf[Exception])
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ }
+
+ @After
+ @throws(classOf[Exception])
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfAllNeighborsNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfOutNeighborsNoValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfAllNeighbors {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
+ result.writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result = graph.groupReduceOnNeighbors(new
+ SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
+ result.writeAsCsv(resultPath)
+ env.execute
+ expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285"
+ }
+
+ final class SumNeighbors extends ReduceNeighborsFunction[Long] {
+ override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
+ firstNeighbor + secondNeighbor
+ }
+ }
+
+ final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long,
+ Long)] {
+ @throws(classOf[Exception])
+ def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
+ Vertex[Long, Long])], out: Collector[(Long, Long)]) {
+ var sum: Long = 0
+ for (neighbor <- neighbors) {
+ sum += neighbor._2.getValue
+ }
+ out.collect((vertex.getId, sum + vertex.getValue))
+ }
+ }
+
+ final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
+ NeighborsFunction[Long, Long, Long, (Long, Long)] {
+ @throws(classOf[Exception])
+ def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
+ out: Collector[(Long, Long)]) {
+ var sum: Long = 0
+ var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
+ val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
+ neighbors.iterator
+ while (neighborsIterator.hasNext) {
+ next = neighborsIterator.next
+ sum += next._3.getValue * next._2.getValue
+ }
+ if (next._1 > 2) {
+ out.collect(new Tuple2[Long, Long](next._1, sum))
+ out.collect(new Tuple2[Long, Long](next._1, sum * 2))
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index ff27949..8552c01 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -383,6 +383,17 @@ public class Graph<K, VV, EV> {
TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
Vertex.class, keyType, valueType);
+ return mapVertices(mapper, returnType);
+ }
+
+ /**
+ * Apply a function to the attribute of each vertex in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @param returnType the explicit return type.
+ * @return a new graph
+ */
+ public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
@@ -411,6 +422,18 @@ public class Graph<K, VV, EV> {
TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
Edge.class, keyType, keyType, valueType);
+ return mapEdges(mapper, returnType);
+ }
+
+ /**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @param returnType the explicit return type.
+ * @return a new graph
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
DataSet<Edge<K, NV>> mappedEdges = edges.map(
new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
@@ -754,6 +777,38 @@ public class Graph<K, VV, EV> {
/**
* Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges has access to the vertex value.
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return vertices.coGroup(edges).where(0).equalTo(1)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ case OUT:
+ return vertices.coGroup(edges).where(0).equalTo(0)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ case ALL:
+ return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
* on the edges only has access to the vertex id (not the vertex value).
*
* @param edgesFunction
@@ -785,6 +840,40 @@ public class Graph<K, VV, EV> {
}
}
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges only has access to the vertex id (not the vertex value).
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return edges.map(new ProjectVertexIdMap<K, EV>(1))
+ .withForwardedFields("f1->f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ case OUT:
+ return edges.map(new ProjectVertexIdMap<K, EV>(0))
+ .withForwardedFields("f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ case ALL:
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
@@ -1412,6 +1501,51 @@ public class Graph<K, VV, EV> {
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors has access to the vertex
+ * value.
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0);
+ return vertices.coGroup(edgesWithSources)
+ .where(0).equalTo("f0.f1")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0);
+ return vertices.coGroup(edgesWithTargets)
+ .where(0).equalTo("f0.f0")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return vertices.coGroup(edgesWithNeighbors)
+ .where(0).equalTo(0)
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors only has access to the
* vertex id (not the vertex value).
*
@@ -1454,6 +1588,51 @@ public class Graph<K, VV, EV> {
}
}
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors only has access to the
+ * vertex id (not the vertex value).
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(1))
+ .withForwardedFieldsFirst("f1->f0");
+ return edgesWithSources.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(0))
+ .withForwardedFieldsFirst("f0");
+ return edgesWithTargets.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return edgesWithNeighbors.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index b3aec14..a0cda67 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
<module>flink-table</module>
<module>flink-ml</module>
<module>flink-language-binding</module>
+ <module>flink-gelly-scala</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->