You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/07 21:15:39 UTC

flink git commit: [FLINK-2785] [gelly] implement fromCsvReader for gelly-scala; add tests and docs

Repository: flink
Updated Branches:
  refs/heads/master 6e42f9fc0 -> 47b5cb795


[FLINK-2785] [gelly] implement fromCsvReader for gelly-scala; add tests and docs

This closes #1205


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47b5cb79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47b5cb79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47b5cb79

Branch: refs/heads/master
Commit: 47b5cb79586b76cef1e3fe2b09941675799a2a38
Parents: 6e42f9f
Author: vasia <va...@apache.org>
Authored: Wed Sep 30 23:53:06 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Wed Oct 7 20:46:36 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  24 +-
 .../org/apache/flink/graph/scala/Graph.scala    | 127 +++++++++++
 .../test/GellyScalaAPICompletenessTest.scala    |   4 +-
 .../operations/GraphCreationWithCsvITCase.scala | 225 +++++++++++++++++++
 4 files changed, 376 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47b5cb79/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index e08cf32..fa2c86c 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -178,7 +178,7 @@ Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input",
 					.types(String.class, Long.class, Double.class);
 
 
-// create a Graph with no Vertex or Edge values
+// create a Graph with neither Vertex nor Edge values
 Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
 {% endhighlight %}
 </div>
@@ -193,6 +193,28 @@ val edgeTuples = env.readCsvFile[String, String, Double]("path/to/edge/input")
 
 val graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)
 {% endhighlight %}
+
+* from a CSV file of Edge data and an optional CSV file of Vertex data.
+In this case, Gelly will convert each row from the Edge CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field (if present) will be the edge value. If the edges have no associated value, set the `hasEdgeValues` parameter to `false`. The parameter `readVertices` defines whether vertex data are provided. If `readVertices` is set to `true`, then `pathVertices` must be specified. In this case, each row from the Vertex CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. If `readVertices` is set to false, then Vertex data will be ignored and vertices will be automatically created from the edges input.
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// create a Graph with String Vertex IDs, Long Vertex values and Double Edge values
+val graph = Graph.fromCsvReader[String, Long, Double](
+		readVertices = true,
+		pathVertices = "path/to/vertex/input",
+		pathEdges = "path/to/edge/input",
+		env = env)
+
+
+// create a Graph with neither Vertex nor Edge values
+val simpleGraph = Graph.fromCsvReader[Long, NullValue, NullVale](
+		readVertices = false,
+		pathEdges = "path/to/edge/input",
+		hasEdgeValues = false,
+		env = env)
+{% endhighlight %}
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47b5cb79/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 35af1ed..38702f3 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -126,6 +126,133 @@ object Graph {
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
   }
 
+  /**
+  * Creates a Graph with from a CSV file of vertices and a CSV file of edges
+  * 
+  * @param The Execution Environment.
+  * @param pathEdges The file path containing the edges.
+  * @param readVertices Defines whether the vertices have associated values.
+  * If set to false, the vertex input is ignored and vertices are created from the edges file.
+  * @param pathVertices The file path containing the vertices.
+  * @param hasEdgeValues Defines whether the edges have associated values. True by default.
+  * @param lineDelimiterVertices The string that separates lines in the vertices file.
+  * It defaults to newline.
+  * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values
+  * in the vertices file.
+  * @param quoteCharacterVertices The character to use for quoted String parsing
+  * in the vertices file. Disabled by default.
+  * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
+  * @param ignoreCommentsVertices Lines that start with the given String in the vertices file
+  * are ignored, disabled by default.
+  * @param lenientVertices Whether the parser should silently ignore malformed lines in the
+  * vertices file.
+  * @param includedFieldsVertices The fields in the vertices file that should be read.
+  * By default all fields are read.
+  * @param lineDelimiterEdges The string that separates lines in the edges file.
+  * It defaults to newline.
+  * @param fieldDelimiterEdges The string that separates fields in the edges file.
+  * @param quoteCharacterEdges The character to use for quoted String parsing
+  * in the edges file. Disabled by default.
+  * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
+  * @param ignoreCommentsEdges Lines that start with the given String in the edges file
+  * are ignored, disabled by default.
+  * @param lenientEdges Whether the parser should silently ignore malformed lines in the
+  * edges file.
+  * @param includedFieldsEdges The fields in the edges file that should be read.
+  * By default all fields are read.
+  * @param mapper If no vertex values are provided, this mapper can be used to initialize them.
+  * 
+  */
+  // scalastyle:off
+  // This method exceeds the max allowed number of parameters -->  
+  def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
+    EV: TypeInformation : ClassTag](
+      env: ExecutionEnvironment,
+      pathEdges: String,
+      readVertices: Boolean,
+      pathVertices: String = null,
+      hasEdgeValues: Boolean = true,
+      lineDelimiterVertices: String = "\n",
+      fieldDelimiterVertices: String = ",",
+      quoteCharacterVertices: Character = null,
+      ignoreFirstLineVertices: Boolean = false,
+      ignoreCommentsVertices: String = null,
+      lenientVertices: Boolean = false,
+      includedFieldsVertices: Array[Int] = null,
+      lineDelimiterEdges: String = "\n",
+      fieldDelimiterEdges: String = ",",
+      quoteCharacterEdges: Character = null,
+      ignoreFirstLineEdges: Boolean = false,
+      ignoreCommentsEdges: String = null,
+      lenientEdges: Boolean = false,
+      includedFieldsEdges: Array[Int] = null,
+      mapper: MapFunction[K, VV] = null) = {
+
+    // with vertex and edge values
+    if (readVertices && hasEdgeValues) {
+      if (pathVertices.equals(null)) {
+        throw new IllegalArgumentException(
+            "The vertices file path must be specified when readVertices is true.")
+      } else {
+        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
+            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
+            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
+
+        val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+            includedFieldsEdges)
+     
+        fromTupleDataSet[K, VV, EV](vertices, edges, env) 
+      }
+    }
+    // with vertex value and no edge value
+    else if (readVertices && (!hasEdgeValues)) {
+       if (pathVertices.equals(null)) {
+        throw new IllegalArgumentException(
+            "The vertices file path must be specified when readVertices is true.")
+      } else {
+        val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
+            fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
+            ignoreCommentsVertices, lenientVertices, includedFieldsVertices)
+
+        val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+            quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+            includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
+
+        fromTupleDataSet[K, VV, NullValue](vertices, edges, env)
+      }
+    }
+    // with edge value and no vertex value
+    else if ((!readVertices) && hasEdgeValues) {
+      val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+        quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
+        includedFieldsEdges)
+
+      // initializer provided
+      if (mapper != null) {
+        fromTupleDataSet[K, VV, EV](edges, env, mapper)
+      }
+      else {
+        fromTupleDataSet[K, EV](edges, env) 
+      }
+    }
+    // with no edge value and no vertex value
+    else {
+      val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
+      quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges,
+      lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
+
+      // no initializer provided
+      if (mapper != null) {
+        fromTupleDataSet[K, VV, NullValue](edges, env, mapper)
+      }
+      else {
+        fromTupleDataSet[K, NullValue](edges, env) 
+      }
+    }
+  }
+// scalastyle:on
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/47b5cb79/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
index c63c4f8..55faee3 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -32,9 +32,7 @@ class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 
   override def isExcludedByName(method: Method): Boolean = {
     val name = method.getDeclaringClass.getName + "." + method.getName
-    val excludedNames = Seq("org.apache.flink.graph.Graph.getContext",
-        // NOTE: until fromCsvReader() is added to to the Scala API Graph
-        "org.apache.flink.graph.Graph.fromCsvReader")
+    val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
     excludedNames.contains(name)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47b5cb79/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
new file mode 100644
index 0000000..6ceaf16
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+import java.io.IOException
+import org.apache.flink.core.fs.FileInputSplit
+import java.io.File
+import java.io.OutputStreamWriter
+import java.io.FileOutputStream
+import java.io.FileOutputStream
+import com.google.common.base.Charsets
+import org.apache.flink.core.fs.Path
+import org.apache.flink.types.NullValue
+import org.apache.flink.api.common.functions.MapFunction
+
+@RunWith(classOf[Parameterized])
+class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvWithValues {
+    /*
+     * Test with two Csv files, both vertices and edges have values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,1\n2,2\n3,3\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoEdgeValues {
+    /*
+     * Test with two Csv files; edges have no values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,one\n2,two\n3,three\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2\n3,2\n3,1\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, String, NullValue](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        pathEdges = edgesSplit.getPath.toString,
+        hasEdgeValues = false,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvWithMapperValues {
+    /*
+     * Test with edges Csv file and vertex mapper initializer
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Double, Long](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        mapper = new VertexDoubleIdAssigner(),
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoVertexValues {
+    /*
+     * Test with edges Csv file: no vertex values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, NullValue, Long](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
+      "3,1,(null),(null),31\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoValues {
+    /*
+     * Test with edges Csv file: neither vertex nor edge values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2\n3,2\n3,1\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, NullValue, NullValue](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        hasEdgeValues = false,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,(null),(null),(null)\n" +
+      "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvOptionsVertices {
+    /*
+     * Test the options for vertices: delimiters, comments, ignore first line.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "42#42\t" + "%this-is-a-comment\t" +
+      "1#1\t" + "2#2\t" + "3#3\t"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        lineDelimiterVertices = "\t",
+        fieldDelimiterVertices = "#",
+        ignoreFirstLineVertices = true,
+        ignoreCommentsVertices = "%",
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvOptionsEdges {
+    /*
+     * Test the options for edges: delimiters, comments, ignore first line.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,1\n2,2\n3,3\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "42#42#ignore&" + "1#2#ot&" + "3#2#tt&" + "3#1#to&" +
+      "//this-is-a-comment"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        pathVertices = verticesSplit.getPath.toString,
+        readVertices = true,
+        lineDelimiterEdges = "&",
+        fieldDelimiterEdges = "#",
+        ignoreFirstLineEdges = true,
+        ignoreCommentsEdges = "//",
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @throws(classOf[IOException])
+  def createTempFile(content: String): FileInputSplit = {
+    val tempFile = File.createTempFile("test_contents", "tmp")
+    tempFile.deleteOnExit()
+
+    val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), Charsets.UTF_8)
+    wrt.write(content)
+    wrt.close()
+
+    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
+        Array("localhost"));
+    }
+
+    final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
+      @throws(classOf[Exception])
+      def map(id: Long): Double = {id.toDouble}
+    }
+
+}