You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shghatge <gi...@git.apache.org> on 2015/06/17 15:41:17 UTC

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

GitHub user shghatge opened a pull request:

    https://github.com/apache/flink/pull/847

    [FLINK-1520]Read edges and vertices from CSV files

    
    
    [FLINK-1520]Read edges and vertices from CSV files
    
    Changes done->
    1) Added a GraphCsvReader class which has 2 CsvReaders as members EdgeReader and VertexReader
    
    To make smooth chaining of functions possible for configuration of the member CsvReaders implemented the configuration methods in CsvReader in GraphCsvReader so that all the configurations can be done on both CsvReaders on calling the function once and the methods again return a GraphCsvReader
    Only the methods to specify which fields are supposed to be chosen from the individual files are separate for Edge and Vertex reader.
    
    Since specifying types is necessary because of type-erasure, implemented a types method in the GraphCsvReader class that returns a Graph with the specified types as the type for VertexID, Vertex Value and Edge Value. Other way for doing this was sending the types in a method to construct the graph itself but to make it as similar to CsvReader as possible this approach was taken.
    
    2) Added 3 methods in Graph.java similar to other methods for Graph creation. These methods use one mandatory path and one optional path and optional mapper for Graph Creation. Only difference is that these methods return an instance of GraphCsvReader instead of Graph.
    
    3)Added appropriate methods in GraphCreationITCase and GraphCreationWithMapperITCase,java
    Also added createTempFile() method to both to help with the tests.
    
    4) Added the documentation for the new functionalities to gelly_guide.md
    
    
    Closed the previous pull request and made a new one with a fresh branch because the previous changes are not merged yet.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shghatge/flink csv_clear_pull

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/847.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #847
    
----
commit b7c1079f9fe56a2586f36f8b5eca5208b33e9cf8
Author: Shivani <sh...@gmail.com>
Date:   2015-06-17T13:37:36Z

    [FLINK-1520]Read edges and vertices from CSV files

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r34504887
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +@SuppressWarnings({"unused" , "unchecked"})
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +	protected Class<K> vertexKey;
    +	protected Class<VV> vertexValue;
    +	protected Class<EV> edgeValue;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),
    +				new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	//--------------------------------------------------------------------------------------------------------------------
    +	/**
    +	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
    +	 *
    +	 * @param vertexKey The type of Vetex ID in the Graph.
    +	 * @param  edgeValue The type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
    +	 */
    +	public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> edgeValue) {
    +		this.vertexKey = vertexKey;
    +		this.edgeValue = edgeValue;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
    +	 * This method is overloaded for the case when the type of EdgeValue is NullValue
    +	 * @param vertexKey The type of Vetex ID in the Graph.
    +	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
    +	 */
    +	public GraphCsvReader typesEdges(Class<K> vertexKey) {
    +		this.vertexKey = vertexKey;
    +		this.edgeValue = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the types for the vertices fields and returns an instance of Graph
    +	 * @param vertexKey The type of Vertex ID in the Graph.
    +	 * @param vertexValue The type of Vertex Value in the Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph}
    +	 */
    +	public Graph<K, VV, EV> typesVertices(Class vertexKey, Class vertexValue) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue);
    +		if(mapper == null && this.VertexReader != null) {
    +		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
    --- End diff --
    
    Indentation is ruined here... DataSet<Tuple2> should have a tab before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-119705061
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121072347
  
    Still an overkill I think... Could another way be to have only `types(K, VV, EV)` with all 3 arguments and expect `NullValue` if a value is missing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r34504919
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +@SuppressWarnings({"unused" , "unchecked"})
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +	protected Class<K> vertexKey;
    +	protected Class<VV> vertexValue;
    +	protected Class<EV> edgeValue;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),
    +				new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	//--------------------------------------------------------------------------------------------------------------------
    +	/**
    +	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
    +	 *
    +	 * @param vertexKey The type of Vetex ID in the Graph.
    +	 * @param  edgeValue The type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
    +	 */
    +	public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> edgeValue) {
    +		this.vertexKey = vertexKey;
    +		this.edgeValue = edgeValue;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
    +	 * This method is overloaded for the case when the type of EdgeValue is NullValue
    +	 * @param vertexKey The type of Vetex ID in the Graph.
    +	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
    +	 */
    +	public GraphCsvReader typesEdges(Class<K> vertexKey) {
    +		this.vertexKey = vertexKey;
    +		this.edgeValue = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the types for the vertices fields and returns an instance of Graph
    +	 * @param vertexKey The type of Vertex ID in the Graph.
    +	 * @param vertexValue The type of Vertex Value in the Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph}
    +	 */
    +	public Graph<K, VV, EV> typesVertices(Class vertexKey, Class vertexValue) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue);
    +		if(mapper == null && this.VertexReader != null) {
    +		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
    +		return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		} else if(this.mapper != null) {
    +		return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		} else {
    --- End diff --
    
    same around here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge closed the pull request at:

    https://github.com/apache/flink/pull/847


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354695
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction<K, VV> mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction<K, VV> mapper, ExecutionEnvironment context)
    +	{
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types in the special case
    +	 * where Vertices don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, NullValue, EV> typesVertexValueNull(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type1);
    +		return Graph.fromTupleDataSet(edges, executionContext);
    +	}
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case
    +	 * where Edges don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, VV, NullValue> typesEdgeValueNull(Class<K> type0, Class<VV> type1)
    +	{
    +		DataSet<Tuple3<K, K, NullValue>> edges = this.EdgeReader.types(type0, type0)
    +				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
    +					@Override
    +					public Tuple3<K, K, NullValue> map(Tuple2<K, K> tuple2) throws Exception {
    +						return new Tuple3<K, K, NullValue>(tuple2.f0, tuple2.f1, NullValue.getInstance());
    +					}
    +				});
    +
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    --- End diff --
    
    this will be 
    } else {
    ...
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33823666
  
    --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java ---
    @@ -54,16 +75,13 @@ public void testCreateWithoutVertexValues() throws Exception {
     		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
     
    -        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
    -        List<Vertex<Long,NullValue>> result= data.collect();
    -        
    +		graph.getVertices().writeAsCsv(resultPath);
    --- End diff --
    
    hmm it seems you're reverting the changes of #863?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-115689621
  
    Hi @shghatge ,
    
    I left my set of comments inline. They are mostly related to coding style issues. I guess you should revisit the previous comments here.
    
    Also, don't forget to rebase. It seems like there are some merge conflicts that need to be fixed :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-130014300
  
    Saw this "I will also update the documentation " afterwards... Sorry!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33823720
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	public CsvReader getEdgeReader() {
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader() {
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    --- End diff --
    
    Yes. I explained how it is passed to the edgeReader and vertexReader and at the end what it signifies in the graph. i.e. Key type for vertex. Should I only keep the last part?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674875
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    --- End diff --
    
    Could you please elaborate on which types? Should I pass types to the mapper function here too? The functions seemed to work without the type arguments for MapFunction so I thought it would be okay. I will work on this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113165614
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32679424
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    --- End diff --
    
    Yes, you need to define K and V in the class definition if you want to use them here, like `GraphCsvReader<K, VV, EV>`. If you find that the types are not needed, you can simply suppress the warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33353966
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,54 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    --- End diff --
    
    there is a trailing from Tuple3 here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113439469
  
    Hello @Andra
    There is one test for fromCsv with mapper in GraphCreationWithMapperITCase.java
    Should I add more tests for that?
    
    Also for the examples comment, do you mean that I should update the Gelly guide by removing the examples for Csv file which use env.readCsvFile("");?
    
    I will add the other tests. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33823849
  
    --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java ---
    @@ -52,16 +72,13 @@ public void testWithDoubleValueMapper() throws Exception {
     		Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
     				new AssignDoubleValueMapper(), env);
     
    -        DataSet<Vertex<Long,Double>> data = graph.getVertices();
    -        List<Vertex<Long,Double>> result= data.collect();
    -		
    +		graph.getVertices().writeAsCsv(resultPath);
    --- End diff --
    
    Same here.. We changed the tests to use `collect()` instead of files in #863. Please don't change it back ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-114975168
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-123057981
  
    The only problem with assuming NullValue if a value is missing is that we can't return NullValue in place of VV.
    I mean to say Graph<K, VV, EV> in this VV or EV can't be NullValue. otherwise that was what I was originally going for. 
    Maybe since any of the other methods to create DataSet/Graph don't provide a method to give EdgeValue as NullValue and just expect the user to map it (at least that is what I saw), maybe we could just remove the functionality. I had only added it since many examples seemed to use it so I thought it would be nice to have that functionality. 
    In any case we can just keep one typesNullEdge method too because if they don't want that, they can use normal overloaded types, 3 arguments for no NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge and just one method named typesNullEdge to tell that only edges have NullValue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354136
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,54 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param verticesPath path to a CSV file with the Vertices data.
    +	* @param edgesPath path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +	public static  GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
    +		return new GraphCsvReader(verticesPath, edgesPath, context);
    +	}
    +	/** Creates a graph from a CSV file for Edges., Vertices are
    --- End diff --
    
    ... Edges. \n (right now it\s .,)
    Vertices....



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674930
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    --- End diff --
    
    same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33822502
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	public CsvReader getEdgeReader() {
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader() {
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    --- End diff --
    
    I think this description is quite confusing..
    Is `type0` the key type, `type1` the vertex value type and `type2` the edge value type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-133727559
  
    Thanks for the comments @andralungu!
    @shghatge, can you please close this PR? I will make the docs update and open a new one, which will include your work and my changes if that's OK with you. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-120856171
  
    Hi,
    
    I just had a closer look at this PR and it made me seriously question the utility of a `Graph.fromCSV` method. Why? First of all because it's more limited than the regular `env.fromCsv()` in the sense that it does not allow POJOs and it would be a bit tedious to support that. There would be a need for methods with 2 to n fields, according to the amount of attributes present in the POJO. 
    
    Second, because, and I am speaking strictly as a user here, I would rather write:
    private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
    
    		if(fileOutput) {
    			return env.readCsvFile(edgeInputPath)
    					.ignoreComments("#")
    					.fieldDelimiter("\t")
    					.lineDelimiter("\n")
    					.types(Long.class, Long.class, Double.class)
    					.map(new Tuple3ToEdgeMap<Long, Double>());
    		} else {
    			return CommunityDetectionData.getDefaultEdgeDataSet(env);
    		}
    	}
    
    than...
    
    private static Graph<Long, Long, Double> getGraph(ExecutionEnvironment env) {
    		Graph<Long, Long, Double> graph;
    		if(!fileOutput) {
    			DataSet<Edge<Long, Double>> edges = CommunityDetectionData.getDefaultEdgeDataSet(env);
    			graph = Graph.fromDataSet(edges,
    					new MapFunction<Long, Long>() {
    
    						public Long map(Long label) {
    							return label;
    						}
    					}, env);
    		} else {
    			graph = Graph.fromCsvReader(edgeInputPath,new MapFunction<Long, Long>() {
    				public Long map(Long label) {
    					return label;
    				}
    			}, env).ignoreCommentsEdges("#")
    					.fieldDelimiterEdges("\t")
    					.lineDelimiterEdges("\n")
    					.typesEdges(Long.class, Double.class)
    					.typesVertices(Long.class, Long.class);
    		}
    		return graph;
    	}
    
    Maybe it's just a preference thing... but I believe it's at least worth a discussion. On the other hand, the utility of such a method should have been questioned from its early Jira days, so I guess that's my mistake.
    
    I would like to hear your thoughts on this. 
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-119698681
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673330
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path1,path2,context));
    +	}
    +	/** Creates a graph from a CSV file for Edges., Vertices are
    +	* induced from the edges.
    +	*
    +	* Edges with value are created from a CSV file with 3 fields. Vertices are created
    +	* automatically and their values are set to NullValue.
    +	*
    +	* @param path a path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	* Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static GraphCsvReader fromCsvReader(String path, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path,context));
    +	}
    +
    +	/**
    +	 *Creates a graph from a CSV file for Edges., Vertices are
    +	 * induced from the edges and vertex values are calculated by a mapper
    +	 * function.  Edges with value are created from a CSV file with 3 fields.
    +	 * Vertices are created automatically and their values are set by applying the provided map
    +	 * function to the vertex ids.
    +	 *
    +	 * @param path a path to a CSV file with the Edges data
    +	 * @param mapper the mapper function.
    +	 * @param context the flink execution environment.
    +	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	 * Vertex ID, Vertex Value and Edge value returns a Graph
    +	 */
    +
    +	public static GraphCsvReader fromCsvReader(String path, final MapFunction mapper,ExecutionEnvironment context)
    --- End diff --
    
    add type arguments to MapFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674923
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    --- End diff --
    
    same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672995
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path1,path2,context));
    +	}
    +	/** Creates a graph from a CSV file for Edges., Vertices are
    +	* induced from the edges.
    +	*
    +	* Edges with value are created from a CSV file with 3 fields. Vertices are created
    +	* automatically and their values are set to NullValue.
    +	*
    +	* @param path a path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	* Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static GraphCsvReader fromCsvReader(String path, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path,context));
    +	}
    +
    +	/**
    +	 *Creates a graph from a CSV file for Edges., Vertices are
    +	 * induced from the edges and vertex values are calculated by a mapper
    +	 * function.  Edges with value are created from a CSV file with 3 fields.
    +	 * Vertices are created automatically and their values are set by applying the provided map
    +	 * function to the vertex ids.
    +	 *
    +	 * @param path a path to a CSV file with the Edges data
    +	 * @param mapper the mapper function.
    +	 * @param context the flink execution environment.
    +	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	 * Vertex ID, Vertex Value and Edge value returns a Graph
    +	 */
    +
    +	public static GraphCsvReader fromCsvReader(String path, final MapFunction mapper,ExecutionEnvironment context)
    +	{
    +		return (new GraphCsvReader(path,mapper,context));
    --- End diff --
    
    same applies here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672778
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    --- End diff --
    
    remove new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354535
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    --- End diff --
    
    Also, @vasia was talking about some general coding style rules in one of the previous comments... The way we add the opening block brackets must be consistent. So here, after public GraphCsvReader(...) { //open the bracket on the same line.
    
    Please look in the rest of the document for similar issues...  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121032514
  
    Hi @vasia, 
    
    I also saw the types issue, but I had a feeling that this is the way it was decided in the previous comment. I would rather have different names for 2 and 3 than to force a call to `typeVertices` if it's not needed.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673158
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    --- End diff --
    
    unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121215586
  
    yes, I mean `NullValue.class` :)
    I'd like to know @shghatge's opinion, too!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674993
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Enables quoted String parsing. Field delimiters in quoted Strings are ignored.
    +	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
    +	 * Leading or tailing whitespaces are not allowed.
    +	 *
    +	 * @param quoteCharacter The character which is used as quoting character.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader parseQuotedStrings(char quoteCharacter) {
    +		this.EdgeReader.parseQuotedStrings(quoteCharacter);
    +		this.VertexReader.parseQuotedStrings(quoteCharacter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures the string that starts comments.
    +	 * By default comments will be treated as invalid lines.
    +	 * This function only recognizes comments which start at the beginning of the line!
    +	 *
    +	 * @param commentPrefix The string that starts the comments.
    +	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader ignoreComments(String commentPrefix) {
    +		this.EdgeReader.ignoreComments(commentPrefix);
    +		this.VertexReader.ignoreComments(commentPrefix);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader includeVertexFields(boolean ... vertexFields) {
    +		this.VertexReader.includeFields(vertexFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader includeEdgeFields(boolean ... edgeFields) {
    +		this.EdgeReader.includeFields(edgeFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +
    +
    +	/**
    +	 * Sets the CSV readers for the Edges and Vertices files to ignore the first line. This is useful for files that contain a header line.
    +	 *
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader ignoreFirstLine() {
    +		this.EdgeReader.ignoreFirstLine();
    +		this.VertexReader.ignoreFirstLine();
    --- End diff --
    
    and here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121039562
  
    Yes, but then you would have the following methods: `types`, `typesNoEdgeValue`, `typesNoVertexValue` and again `types`. So, even if it's not 100% needed I'd try to keep it consistent. We could also make it more graph-oriented (the name `types` was generic). The following is just an example:
    1). keyType(K) 
    2). keyAndVertexTypes(K, VV)
    3). keyAndEdgeTypes(K, EV)
    4). keyVertexAndEdgeTypes(K, VV, EV)
    
    With a nice documentation, I think I'd understand what these are for :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-125225039
  
    Hi @shghatge, @andralungu,
    
    I built on the current state of this PR and made the proposed changes above, together with some styling changes. You can see the result on my [local branch](https://github.com/vasia/flink/tree/csvInput).
    If you agree with the changes, I will also update the documentation and merge this.
    Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-117787551
  
    Nice and rebased. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-130013958
  
    Hi @vasia, 
    
    Not sure whether this comment was issued for me... Nevertheless I left some suggestions inline. All in all, it covers the problems discussed in the 73!!!!! comments here. You forgot to properly document the edgeTypes(K, EV), etc methods. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32675117
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    --- End diff --
    
    On adding MapFunction<K, VV> it is giving an error saying could not resolve this symbol. Is it possible to do this in some way? Should I add the types as members of the class too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354654
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction<K, VV> mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    --- End diff --
    
    here it's this,path1 = null; for consistency with the rest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354071
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,54 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param verticesPath path to a CSV file with the Vertices data.
    +	* @param edgesPath path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    --- End diff --
    
    "on which calling" not which on ... or "which on calling the types method specifies (not to specify)"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674326
  
    --- Diff: docs/libs/gelly_guide.md ---
    @@ -102,6 +102,15 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/ed
     Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
     {% endhighlight %}
     
    +* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the first CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the second CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields :
    --- End diff --
    
    oh! Will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33824099
  
    --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java ---
    @@ -54,16 +75,13 @@ public void testCreateWithoutVertexValues() throws Exception {
     		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
     
    -        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
    -        List<Vertex<Long,NullValue>> result= data.collect();
    -        
    +		graph.getVertices().writeAsCsv(resultPath);
    --- End diff --
    
    Oh... I made these changes before that pull request got merged. I change it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672827
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path1,path2,context));
    +	}
    +	/** Creates a graph from a CSV file for Edges., Vertices are
    +	* induced from the edges.
    +	*
    +	* Edges with value are created from a CSV file with 3 fields. Vertices are created
    +	* automatically and their values are set to NullValue.
    +	*
    +	* @param path a path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	* Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    --- End diff --
    
    new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354571
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    --- End diff --
    
    again, the path1, path2 issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-118888598
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673548
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    --- End diff --
    
    add type arguments to MapFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672727
  
    --- Diff: docs/libs/gelly_guide.md ---
    @@ -102,6 +102,15 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/ed
     Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
     {% endhighlight %}
     
    +* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the first CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the second CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields :
    --- End diff --
    
    It seems to me that the first argument is the vertex file and the second is the edges file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674944
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Enables quoted String parsing. Field delimiters in quoted Strings are ignored.
    +	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
    +	 * Leading or tailing whitespaces are not allowed.
    +	 *
    +	 * @param quoteCharacter The character which is used as quoting character.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader parseQuotedStrings(char quoteCharacter) {
    +		this.EdgeReader.parseQuotedStrings(quoteCharacter);
    +		this.VertexReader.parseQuotedStrings(quoteCharacter);
    --- End diff --
    
    same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113213190
  
    This looks very nice! Someone deserves a virtual :ice_cream: ! 
    
    There are some tests missing: 
    - test `fromCSV` with a Mapper
    - you just test `types`, `ignoreFirstLines` and `ignoreComments`; let's at least add tests for the `lineDelimiter*` and the `fieldDelimiter*` methods. I'm sure they work, but tests are written to guarantee that the functionality will also be there (at the same quality) in the future (i.e. some exotic code addition will not break it) :)
    
    I saw an outdated Vasia comment on an unused import; always hit mvn verify before pushing - it would have caught that :D 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121033796
  
    I hadn't realized that they would both need to be called in my previous comment, my bad.
    Any idea for decent method names? `typesNoEdgeValue` and `typesNoVertexValue` seem really ugly to me :S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-112954221
  
    Hey @shghatge,
    
    this is a great first try, you got the logic right and I really like the detailed javadocs ^^
    I left a few inline comments, which should be easy to fix.
    
    Let me also elaborate a bit on some general guidelines:
    - Code formatting: we don't really have a strict Java code style, but there a few things you can improve. For your code to be readable, it is nice to leave a space after the commas separating arguments. For example `myMethod(arg1, arg2, arg3)` instead of `myMethod(arg1,arg2,arg3)`.
    We usually separate the closing of a parenthesis and the opening of a curly bracket with a space, i.e. `myMethod() { ... }` instead of  `myMethod(){ ... }`.
    Also, try to avoid adding new lines if they are not needed.
    Regarding the types missing, this is not creating an error, but gives a warning. You can turn on warning notification settings in your IDE to avoid this.
    
    - I like it that you added separate methods `includeFields` methods` for vertices and edges. It would probably make sense to do the same for the rest of the methods. For example, you might want to skip the first line in the edges file, but not in the vertices file. Right now, you are forced to either do both or none. Alternatively, we could add parameters to the existing methods, to define the behavior for edges and vertices files separately. For example `public GraphCsvReader lineDelimiter(String VertexDelimiter, EdgeDelimiter)`. What do you think?
    
    - Finally, in order to catch issues like the one with the null `VertexReader`, you should always try to test as much functionality you have added as possible. In this case, it would be a good idea to add a test reading from edges only and some tests for the different methods you have added.
    
    Let me know if you have questions!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32676277
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    --- End diff --
    
    This one is deprecated. We can remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33355180
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---
    @@ -119,23 +113,32 @@ private static boolean parseParameters(String [] args) {
     		return true;
     	}
     
    -	@SuppressWarnings("serial")
    -	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
    -
    -		if(fileOutput) {
    -			return env.readCsvFile(edgeInputPath)
    -					.ignoreComments("#")
    -					.fieldDelimiter("\t")
    -					.lineDelimiter("\n")
    -					.types(Long.class, Long.class)
    -					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
    -						@Override
    -						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
    -							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
    +	private static Graph<Long, Long, NullValue> getGraph(ExecutionEnvironment env)
    +	{
    +		Graph<Long, Long, NullValue> graph;
    +		if(!fileOutput)
    +		{
    +			DataSet<Edge<Long, NullValue>> edges = ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
    --- End diff --
    
    Let's also keep this consistent. In Single Source Shortest Paths you read fromDataSet(getDefault..., env). maybe we could do that for all the examples 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32675011
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Enables quoted String parsing. Field delimiters in quoted Strings are ignored.
    +	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
    +	 * Leading or tailing whitespaces are not allowed.
    +	 *
    +	 * @param quoteCharacter The character which is used as quoting character.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader parseQuotedStrings(char quoteCharacter) {
    +		this.EdgeReader.parseQuotedStrings(quoteCharacter);
    +		this.VertexReader.parseQuotedStrings(quoteCharacter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures the string that starts comments.
    +	 * By default comments will be treated as invalid lines.
    +	 * This function only recognizes comments which start at the beginning of the line!
    +	 *
    +	 * @param commentPrefix The string that starts the comments.
    +	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader ignoreComments(String commentPrefix) {
    +		this.EdgeReader.ignoreComments(commentPrefix);
    +		this.VertexReader.ignoreComments(commentPrefix);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader includeVertexFields(boolean ... vertexFields) {
    +		this.VertexReader.includeFields(vertexFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader includeEdgeFields(boolean ... edgeFields) {
    +		this.EdgeReader.includeFields(edgeFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +
    +
    +	/**
    +	 * Sets the CSV readers for the Edges and Vertices files to ignore the first line. This is useful for files that contain a header line.
    +	 *
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader ignoreFirstLine() {
    +		this.EdgeReader.ignoreFirstLine();
    +		this.VertexReader.ignoreFirstLine();
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the CSV readers for the Edges and Vertices files  to ignore any invalid lines.
    +	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
    +	 *
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader ignoreInvalidLines(){
    +		this.EdgeReader.ignoreInvalidLines();
    +		this.VertexReader.ignoreInvalidLines();
    --- End diff --
    
    and this should be the last one ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-117728162
  
    Updated PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33822653
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	public CsvReader getEdgeReader() {
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader() {
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type2);
    +		if(vertexPath!=null) {
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		} else {
    +			return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types in the special case
    +	 * where Vertices don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, NullValue, EV> typesVertexValueNull(Class<K> type0, Class<EV> type1) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type1);
    +		return Graph.fromTupleDataSet(edges, executionContext);
    +	}
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case
    +	 * where Edges don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, VV, NullValue> typesEdgeValueNull(Class<K> type0, Class<VV> type1) {
    +		DataSet<Tuple3<K, K, NullValue>> edges = this.EdgeReader.types(type0, type0)
    +				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
    +					@Override
    +					public Tuple3<K, K, NullValue> map(Tuple2<K, K> tuple2) throws Exception {
    +						return new Tuple3<K, K, NullValue>(tuple2.f0, tuple2.f1, NullValue.getInstance());
    +					}
    +				});
    +
    +		if(vertexPath!=null) {
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		} else {
    +			return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		}
    +	}
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types.
    +	 * This method is overloaded for the case in which Vertices and Edges don't have a value
    --- End diff --
    
    Same here.. Actually, are you allowing reading edges with no values? It seems no when reading the docs, but it appears so here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-136666651
  
    @vasia  It is fine with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673806
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    --- End diff --
    
    types missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672805
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path1,path2,context));
    --- End diff --
    
    parentheses not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113217602
  
    Ah! And I just remembered! Maybe it makes sense to update the examples to use `fromCSV` when creating the Graph instead of `getEdgesDataSet`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33823858
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV> {
    +
    +	private final Path vertexPath,edgePath;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = vertexPath;
    +		this.edgePath = edgePath;
    +		this.VertexReader = new CsvReader(vertexPath,context);
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +		this.vertexPath = null;
    +		this.edgePath = edgePath;
    +		this.EdgeReader = new CsvReader(edgePath,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
    +		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
    +			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
    +	}
    +
    +	public CsvReader getEdgeReader() {
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader() {
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type2);
    +		if(vertexPath!=null) {
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		} else {
    +			return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types in the special case
    +	 * where Vertices don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, NullValue, EV> typesVertexValueNull(Class<K> type0, Class<EV> type1) {
    +		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(type0, type0, type1);
    +		return Graph.fromTupleDataSet(edges, executionContext);
    +	}
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case
    +	 * where Edges don't have a value
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public Graph<K, VV, NullValue> typesEdgeValueNull(Class<K> type0, Class<VV> type1) {
    +		DataSet<Tuple3<K, K, NullValue>> edges = this.EdgeReader.types(type0, type0)
    +				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
    +					@Override
    +					public Tuple3<K, K, NullValue> map(Tuple2<K, K> tuple2) throws Exception {
    +						return new Tuple3<K, K, NullValue>(tuple2.f0, tuple2.f1, NullValue.getInstance());
    +					}
    +				});
    +
    +		if(vertexPath!=null) {
    +			DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(type0, type1);
    +			return Graph.fromTupleDataSet(vertices, edges, executionContext);
    +		} else {
    +			return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
    +		}
    +	}
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types.
    +	 * This method is overloaded for the case in which Vertices and Edges don't have a value
    --- End diff --
    
    Yes. I added two special cases where one could read edges with no value and both vertices and edges with null value. Should I update the gelly_guide for the same? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673063
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    --- End diff --
    
    I would rename `path1` and `path2` to something like `verticesPath` and `edgesPath`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673274
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    --- End diff --
    
    add type arguments to MapFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-118173012
  
    Hi @shghatge! Thank you for the update :)
    
    I left some comments inline. There are still some formatting issues in the code. Please, carefully go through your changes and try to be consistent. Also, there are still several warning regarding types, unused annotations, unused variables. Can you please try to remove them? Your IDE should have a setting that gives you the list of warnings.
    
    Regarding the tests, better create new test files for your methods, since you need to test with files and currently other tests use `collect()`.
    
    Finally, I find the `types()` methods a bit confusing. Could we maybe have separate types methods for the vertices and edges? e.g. `typesEdges(keyType, valueType)`, `typesEdges(keyType)`, `typesVertices(keyType, valueType)` and `typesVertices(keyType)`?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354309
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    --- End diff --
    
    let's not call these path1 and path2. I suggest we use better names like edgePath, vertexPath... This is valid for the methods underneath too...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113483077
  
    Mea culpa. No the mapper test is fine;
    
    For the examples comment, I meant to go through the classes in the example folder and to modify the way the graph is currently read. Right now, we fetch the edges via `env.fromCsv` and then use `Graph.fromDataSet` to create the graph. We should do it directly via Graph.fromCsv. 
    
    The example in the docs is fine, because it explains how fromDataSet works. That is still available. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33355373
  
    --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java ---
    @@ -156,4 +181,17 @@ public DummyCustomType map(Long vertexId) {
     			return new DummyCustomType(vertexId.intValue()-1, false);
     		}
     	}
    +
    +	private FileInputSplit createTempFile(String content) throws IOException {
    +		File tempFile = File.createTempFile("test_contents", "tmp");
    +		tempFile.deleteOnExit();
    --- End diff --
    
    `deleteOnExit()`... nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674908
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    --- End diff --
    
    when you create a Graph with edges file only, your VertexReader will be null and this will give you a NullPointerException :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32675480
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Enables quoted String parsing. Field delimiters in quoted Strings are ignored.
    +	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
    +	 * Leading or tailing whitespaces are not allowed.
    +	 *
    +	 * @param quoteCharacter The character which is used as quoting character.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader parseQuotedStrings(char quoteCharacter) {
    +		this.EdgeReader.parseQuotedStrings(quoteCharacter);
    +		this.VertexReader.parseQuotedStrings(quoteCharacter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures the string that starts comments.
    +	 * By default comments will be treated as invalid lines.
    +	 * This function only recognizes comments which start at the beginning of the line!
    +	 *
    +	 * @param commentPrefix The string that starts the comments.
    +	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader ignoreComments(String commentPrefix) {
    +		this.EdgeReader.ignoreComments(commentPrefix);
    +		this.VertexReader.ignoreComments(commentPrefix);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader includeVertexFields(boolean ... vertexFields) {
    +		this.VertexReader.includeFields(vertexFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
    +	 * array. The parser will skip over all fields where the boolean value at the corresponding position
    +	 * in the array is {@code false}. The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code true}.
    +	 * The number of fields in the result is consequently equal to the number of times that {@code true}
    +	 * occurs in the fields array.
    +	 *
    +	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader includeEdgeFields(boolean ... edgeFields) {
    +		this.EdgeReader.includeFields(edgeFields);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * positions in the string (read from position 0 to its length) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
    +	 * The parser will skip over all fields where the character at the corresponding position
    +	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
    +	 * {@code false}). The result contains the fields where the corresponding position in
    +	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
    +	 *
    +	 * @param mask The string mask defining which fields to include and which to skip.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(String mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeVertexFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
    +	 * bits in the value (read from least significant to most significant) define whether the field at
    +	 * the corresponding position in the CSV schema should be included.
    +	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
    +	 * non-zero bit.
    +	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
    +	 * include the fields where the corresponding bit is one.
    +	 * <p>
    +	 * Examples:
    +	 * <ul>
    +	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
    +	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
    +	 *       two and three, skip fields four and five, and include field six.</li>
    +	 * </ul>
    +	 *
    +	 * @param mask The bit mask defining which fields to include and which to skip.
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader includeEdgeFields(long mask) {
    +		this.VertexReader.includeFields(mask);
    +		return this;
    +	}
    +
    +
    +
    +	/**
    +	 * Sets the CSV readers for the Edges and Vertices files to ignore the first line. This is useful for files that contain a header line.
    +	 *
    +	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader ignoreFirstLine() {
    +		this.EdgeReader.ignoreFirstLine();
    +		this.VertexReader.ignoreFirstLine();
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the CSV readers for the Edges and Vertices files  to ignore any invalid lines.
    +	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
    +	 *
    +	 * @return The CSV reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader ignoreInvalidLines(){
    +		this.EdgeReader.ignoreInvalidLines();
    +		this.VertexReader.ignoreInvalidLines();
    --- End diff --
    
    Okay. Will add a condition for the VertexReader configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33354597
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.types.NullValue;
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +public class GraphCsvReader<K,VV,EV>{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction<K, VV> mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    --- End diff --
    
    again the bracket issue :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r33355251
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java ---
    @@ -150,20 +149,15 @@ private static boolean parseParameters(String[] args) {
     	}
     
     	@SuppressWarnings("serial")
    -	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
    -		if (fileOutput) {
    -			return env.readCsvFile(edgesInputPath)
    -					.lineDelimiter("\n").fieldDelimiter("\t")
    -					.types(Long.class, Long.class).map(
    -							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
    -
    -								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
    -									return new Edge<Long, NullValue>(value.f0, value.f1, 
    -											NullValue.getInstance());
    -								}
    -					});
    -		} else {
    -			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
    +	private static Graph<Long, NullValue, NullValue> getGraph(ExecutionEnvironment env) {
    +		if(fileOutput) {
    +			return Graph.fromCsvReader(edgesInputPath, env).lineDelimiterEdges("\n").fieldDelimiterEdges("\t")
    +										.types(Long.class);
    +
    +		}
    +		else
    +		{
    +			return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env);
    --- End diff --
    
    Yup... so I like how this looks better than how the previous rewritings were made...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32672972
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -282,6 +282,58 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
     	}
     
     	/**
    +	* Creates a graph from CSV files.
    +	*
    +	* Vertices with value are created from a CSV file with 2 fields
    +	* Edges with value are created from a CSV file with 3 fields
    +	* from Tuple3.
    +	*
    +	* @param path1 path to a CSV file with the Vertices data.
    +	* @param path2 path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	*		 Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static  GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path1,path2,context));
    +	}
    +	/** Creates a graph from a CSV file for Edges., Vertices are
    +	* induced from the edges.
    +	*
    +	* Edges with value are created from a CSV file with 3 fields. Vertices are created
    +	* automatically and their values are set to NullValue.
    +	*
    +	* @param path a path to a CSV file with the Edges data
    +	* @param context the flink execution environment.
    +	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the
    +	* Vertex ID, Vertex Value and Edge value returns a Graph
    +	*/
    +
    +	public static GraphCsvReader fromCsvReader(String path, ExecutionEnvironment context){
    +		return (new GraphCsvReader(path,context));
    --- End diff --
    
    Parentheses here too.
    Also, it's nice to have a space after commas when separating arguments and a space before the curly bracket that defines the start of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121215203
  
    Hmmm :-? but can you pass NullValue to tyes... it expects Something.class. Can it be overwritten without type erasure getting in the way? 
    
    Anyway... I will let @shghatge take over from here :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673105
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    --- End diff --
    
    remove new lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32676617
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    --- End diff --
    
    I tried adding the function with types but it is giving error. But the methods for the mapper are tested and are working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32673221
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    --- End diff --
    
    `edgePath` and `vertexPath` also seem to be unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/847#discussion_r32674967
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---
    @@ -0,0 +1,388 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.io.CsvReader;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.core.fs.Path;
    +
    +
    +/**
    + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
    + * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
    + * the delimiters (row and field),  the fields that should be included or skipped, and other flags
    + * such as whether to skip the initial line as the header.
    + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
    + */
    +
    +
    +
    +public class GraphCsvReader{
    +
    +	private final Path path1,path2;
    +	private final ExecutionEnvironment executionContext;
    +
    +	private Path edgePath;
    +	private Path vertexPath;
    +	protected CsvReader EdgeReader;
    +	protected CsvReader VertexReader;
    +	protected MapFunction mapper;
    +
    +//--------------------------------------------------------------------------------------------------------------------
    +
    +	public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1 = path1;
    +		this.path2 = path2;
    +		this.VertexReader = new CsvReader(path1,context);
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.mapper=null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = null;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +		this.path1=null;
    +		this.path2 = path2;
    +		this.EdgeReader = new CsvReader(path2,context);
    +		this.VertexReader = null;
    +		this.mapper = mapper;
    +		this.executionContext=context;
    +	}
    +
    +	public GraphCsvReader (String path2,ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +
    +	}
    +
    +	public GraphCsvReader(String path1, String path2, ExecutionEnvironment context)
    +	{
    +		this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context);
    +	}
    +
    +
    +	public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context)
    +	{
    +
    +			this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context);
    +
    +
    +	}
    +
    +	public CsvReader getEdgeReader()
    +	{
    +		return this.EdgeReader;
    +	}
    +
    +	public CsvReader getVertexReader()
    +	{
    +		return this.VertexReader;
    +	}
    +	//--------------------------------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data  and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph.
    +	 * @param  type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data.
    +	 */
    +	public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2);
    +		if(path1!=null)
    +		{
    +			DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1);
    +			return Graph.fromTupleDataSet(vertices,edges,executionContext);
    +		}
    +		else
    +		{
    +			return Graph.fromTupleDataSet(edges,mapper,executionContext);
    +		}
    +
    +
    +	}
    +	/**
    +	 * Specifies the types for the Graph fields and returns a Graph with those field types
    +	 *
    +	 * This method is overloaded for the case in which Vertices don't have a value
    +	 *
    +	 * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph.
    +	 * @param  type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph.
    +	 * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value.
    +	 */
    +	public <K, EV> Graph<K,NullValue,EV> types(Class<K> type0, Class<EV> type1)
    +	{
    +		DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type1);
    +		return Graph.fromTupleDataSet(edges,executionContext);
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices
    +	 *	({@code '\n'}) is used by default.
    +	 *
    +	 *@param delimiter The delimiter that separates the rows.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader lineDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.lineDelimiter(delimiter);
    +		this.VertexReader.lineDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	@Deprecated
    +	public GraphCsvReader fieldDelimiter(char delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +
    +	/**
    +	 *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices
    +	 * ({@code ','}) is used by default.
    +	 *
    +	 * @param delimiter The delimiter that separates the fields in a row.
    +	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
    +	 */
    +	public GraphCsvReader fieldDelimiter(String delimiter)
    +	{
    +		this.EdgeReader.fieldDelimiter(delimiter);
    +		this.VertexReader.fieldDelimiter(delimiter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Enables quoted String parsing. Field delimiters in quoted Strings are ignored.
    +	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
    +	 * Leading or tailing whitespaces are not allowed.
    +	 *
    +	 * @param quoteCharacter The character which is used as quoting character.
    +	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +	public GraphCsvReader parseQuotedStrings(char quoteCharacter) {
    +		this.EdgeReader.parseQuotedStrings(quoteCharacter);
    +		this.VertexReader.parseQuotedStrings(quoteCharacter);
    +		return this;
    +	}
    +
    +	/**
    +	 * Configures the string that starts comments.
    +	 * By default comments will be treated as invalid lines.
    +	 * This function only recognizes comments which start at the beginning of the line!
    +	 *
    +	 * @param commentPrefix The string that starts the comments.
    +	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
    +	 */
    +
    +
    +	public GraphCsvReader ignoreComments(String commentPrefix) {
    +		this.EdgeReader.ignoreComments(commentPrefix);
    +		this.VertexReader.ignoreComments(commentPrefix);
    --- End diff --
    
    here too!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-121025189
  
    Hi @andralungu,
    
    do you mean support for POJOs as vertex / edge values?
    I guess that's a limitation we can't easily overcome, I agree.
    Still though, a nicely designed `fromCsv()` method would simplify the common case.
    
    As for the examples, I don't like what they currently look like in this PR either. However, that's not a problem of `fromCsv()`. The if-block can be easily simplified by changing `getDefaultEdgeDataSet` to `getDefaultGraph`. The else-block looks longer because of the mapper, which, in the current examples is in the main method.
    
    What I think is quite problematic, is the `types()` methods. Ideally, we would have the following:
    1. `types(K)` : no vertex value, no edge value
    2. `types(K, VV)`: no edge value
    3. `types(K, EV)`: no vertex value
    4. `types(K, VV, EV)`: both vertex and edge values are present
    However, because of type erasure, we can't have both 2 and 3. The current implementation (having separate `typesEdges` and `typesVertices`) means that both should always be called, even if not necessary. Another way would be to give 2 and 3 different names... So far I haven't been able to come up with a nice solution. Ideas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by shghatge <gi...@git.apache.org>.
Github user shghatge commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-113107660
  
    Hello @vasia 
    I will follow the guidelines and add the tests that are suggested by you when making a commit.
    For the separate configuration methods issue, I was thinking more along the lines that if we want to configure the readers separately, then we could use the get methods for the CsvReaders and then configure them. But I will add the separate method now.
    
    Thanks for the detailed guidance.  :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/847#issuecomment-123402206
  
    I see your point @shghatge. 
    However, I think naming just one method differently will be confusing..
    If we're going to have custom method names, let's go with @andralungu's suggestion above and make sure we document these properly.
    I would prefer a bit shorter method names though.
    How about:
    1). `keyType(K)`
    2). `vertexTypes(K, VV)`
    3). `edgeTypes(K, EV)`
    4). `types(K, VV, EV)`
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---