You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2015/10/02 11:50:12 UTC

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1211

    [FLINK-2561] Adds gelly-scala examples

    This PR addresses the remaining issues of FLINK-2561.
    It adds 3 gelly-scala examples: one vertex-centric sssp, one gsa-sssp and one showing how to use library methods (conn. components).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink gelly-scala-example

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1211
    
----
commit 836e3959107c0bebe45888fccdb27c1e87d1011e
Author: vasia <va...@apache.org>
Date:   2015-10-01T20:26:25Z

    [FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP
    and how to use a library method (connected components).

commit 9d3ca468609228c4b0764058e4164995610b0d9c
Author: vasia <va...@apache.org>
Date:   2015-10-02T08:29:04Z

    [gelly] fix parameters order in creation methods to be consistent with the Java API

commit 2b4063bad223b2aefd0134f4dfbba3d83fd08933
Author: vasia <va...@apache.org>
Date:   2015-10-02T09:08:34Z

    [gelly] style corrections

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1211#issuecomment-146200483
  
    The examples look good. 
    But the indention of the Scala code is quite different from the remaining Flink Scala code. Is the remaining Gelly Scala code formatted in the same way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1211#issuecomment-146245763
  
    @fhueske do you want to take another look and see if anything looks off now? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41410709
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.api.common.functions.MapFunction
    +import scala.collection.JavaConversions._
    +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
    +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
    +import org.apache.flink.graph.gsa.GatherFunction
    +import org.apache.flink.graph.gsa.Neighbor
    +import org.apache.flink.graph.gsa.SumFunction
    +import org.apache.flink.graph.gsa.ApplyFunction
    +
    +/**
    + * This example shows how to use Gelly's gather-sum-apply iterations.
    + * 
    + * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
    + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
    + *
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
    + */
    +object GSASingleSourceShortestPaths {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +    }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
    +
    +    // Execute the gather-sum-apply iteration
    +    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
    +      new UpdateDistance, maxIterations)
    +
    +    // Extract the vertices as the result
    +    val singleSourceShortestPaths = result.getVertices
    +
    +    // emit result
    +    if (fileOutput) {
    +      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("GSA Single Source Shortest Paths Example")
    +    } else {
    +      singleSourceShortestPaths.print()
    +    }
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  Single Source Shortest Path UDFs
    +  // --------------------------------------------------------------------------------------------
    +
    +  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
    +
    +    override def map(id: Long) = {
    +      if (id.equals(srcId)) {
    +        0.0
    +      } else {
    +        Double.PositiveInfinity
    +      }
    +    }
    +  }
    +
    +  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
    +    override def gather(neighbor: Neighbor[Double, Double]) = {
    +      neighbor.getNeighborValue + neighbor.getEdgeValue
    +    }
    +  }
    +
    +  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
    +    override def sum(newValue: Double, currentValue: Double) = {
    +      Math.min(newValue, currentValue)
    +    }
    +  }
    +
    +  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
    +    override def apply(newDistance: Double, oldDistance: Double) = {
    +      if (newDistance < oldDistance) {
    +        setResult(newDistance)
    +      }
    +    }
    +  }
    +
    +  // **************************************************************************
    +  // UTIL METHODS
    +  // **************************************************************************
    +
    +  private var fileOutput = false
    +  private var srcVertexId = 1L
    +  private var edgesInputPath: String = null
    +  private var outputPath: String = null
    +  private var maxIterations = 5
    +
    +  private def parseParameters(args: Array[String]): Boolean = {
    +    if(args.length > 0) {
    +      if(args.length != 4) {
    +        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
    +          " <input edges path> <output path> <num iterations>")
    +        false
    +      }
    +      fileOutput = true
    +      srcVertexId = args(0).toLong
    +      edgesInputPath = args(1)
    +      outputPath = args(2)
    +      maxIterations = (3).toInt
    +      } else {
    +        System.out.println("Executing Single Source Shortest Paths example "
    +          + "with default parameters and built-in default data.")
    +        System.out.println("  Provide parameters to read input data from files.")
    +        System.out.println("  See the documentation for the correct format of input files.")
    +        System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
    +          " <input edges path> <output path> <num iterations>");
    +      }
    +    true
    +  }
    +
    +  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
    +    if (fileOutput) {
    +      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
    +        lineDelimiter = "\n",
    +        fieldDelimiter = "\t")
    +        .map(new Tuple3ToEdgeMap[Long, Double]())
    +     } else {
    --- End diff --
    
    off by 1 space?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41410322
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.library.GSAConnectedComponents
    +import java.lang.Long
    +
    +/**
    + * This example shows how to use Gelly's library methods.
    + * You can find all available library methods in [[org.apache.flink.graph.library]]. 
    + * 
    + * In particular, this example uses the
    + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
    + * library method to compute the connected components of the input graph.
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\n1\t3\n</code> defines two edges,
    + * 1-2 and 1-3.
    + *
    + * Usage {{
    + *   ConnectedComponents <edge path> <result path> <number of iterations>
    + *   }}
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
    + */
    +object ConnectedComponents {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +    }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
    +
    +    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
    +
    +
    +    // emit result
    +    if (fileOutput) {
    +      components.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("Connected Components Example")
    +    } else {
    +      components.print()
    +    }
    +  }
    +
    +  private final class InitVertices extends MapFunction[Long, Long] {
    +    override def map(id: Long) = {id}
    +  }
    +
    +  // ***********************************************************************
    +  // UTIL METHODS
    +  // ***********************************************************************
    +
    +    private var fileOutput = false
    +    private var edgesInputPath: String = null
    +    private var outputPath: String = null
    +    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
    +
    +    private def parseParameters(args: Array[String]): Boolean = {
    +      if(args.length > 0) {
    +        if(args.length != 3) {
    +          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
    +            "<num iterations>")
    +          false
    +        }
    +        fileOutput = true
    +        edgesInputPath = args(0)
    +        outputPath = args(1)
    +        maxIterations = (2).toInt
    +      } else {
    +        System.out.println("Executing ConnectedComponents example with default parameters" +
    +            " and built-in default data.")
    +        System.out.println("  Provide parameters to read input data from files.")
    +        System.out.println("  See the documentation for the correct format of input files.")
    +        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
    +            "<num iterations>");
    +      }
    +      true
    +    }
    +
    +    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
    +      if (fileOutput) {
    +        env.readCsvFile[(Long, Long)](edgesInputPath,
    +          lineDelimiter = "\n",
    +          fieldDelimiter = "\t")
    +          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
    +      } else {
    +        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
    +        case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
    --- End diff --
    
    indent this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41389613
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.library.GSAConnectedComponents
    +import java.lang.Long
    +
    +/**
    + * This example shows how to use Gelly's library methods.
    + * You can find all available library methods in [[org.apache.flink.graph.library]]. 
    + * 
    + * In particular, this example uses the
    + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
    + * library method to compute the connected components of the input graph.
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\n1\t3\n</code> defines two edges,
    + * 1-2 with and 1-3.
    + *
    + * Usage {{
    + *   ConnectedComponents <edge path> <result path> <number of iterations>
    + *   }}
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
    + */
    +object ConnectedComponents {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +      }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
    +
    +    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
    +
    +
    +    // emit result
    +    if (fileOutput) {
    +      components.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("Connected Components Example")
    +      } else {
    +        components.print()
    +        }
    +    }
    +
    +  private final class InitVertices extends MapFunction[Long, Long] {
    +    override def map(id: Long) = {id}
    +  }
    +
    +  // ***********************************************************************
    +  // UTIL METHODS
    +  // ***********************************************************************
    +
    +    private var fileOutput = false
    +    private var edgesInputPath: String = null
    +    private var outputPath: String = null
    +    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
    +
    +    private def parseParameters(args: Array[String]): Boolean = {
    +      if(args.length > 0) {
    +        if(args.length != 3) {
    +          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
    +              "<num iterations>")
    +              false
    +              }
    +        fileOutput = true
    +        edgesInputPath = args(0)
    +        outputPath = args(1)
    +        maxIterations = (2).toInt
    +        } else {
    +          System.out.println("Executing ConnectedComponents example with default parameters" +
    +              " and built-in default data.")
    +          System.out.println("  Provide parameters to read input data from files.")
    +          System.out.println("  See the documentation for the correct format of input files.")
    +          System.out.println("Usage ConnectedComponents <edge path> <output path> " +
    +              "<num iterations>");
    +          }
    +      true
    +      }
    +
    +    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
    +      if (fileOutput) {
    +        env.readCsvFile[(Long, Long)](edgesInputPath,
    +            lineDelimiter = "\n",
    +            fieldDelimiter = "\t")
    +            .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
    +       } else {
    --- End diff --
    
    Indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1211


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1211#issuecomment-146237334
  
    The formatting in the rest of the gelly-scala code looks OK. This one got messed up as I copies some methods from the Java examples. I'll fix the formatting and merge this one, too. Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1211#issuecomment-146258426
  
    I believe they're all fixed now!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41411193
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.spargel.VertexUpdateFunction
    +import org.apache.flink.graph.spargel.MessageIterator
    +import org.apache.flink.graph.Vertex
    +import org.apache.flink.graph.spargel.MessagingFunction
    +import scala.collection.JavaConversions._
    +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
    +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
    +
    +/**
    + * This example shows how to use Gelly's vertex-centric iterations.
    + * 
    + * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
    + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
    + *
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
    + */
    +object SingleSourceShortestPaths {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +    }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
    +
    +    // Execute the vertex-centric iteration
    +    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
    +      new MinDistanceMessenger, maxIterations)
    +
    +    // Extract the vertices as the result
    +    val singleSourceShortestPaths = result.getVertices
    +
    +    // emit result
    +    if (fileOutput) {
    +      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("Single Source Shortest Paths Example")
    +     } else {
    +        singleSourceShortestPaths.print()
    +     }
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  Single Source Shortest Path UDFs
    +  // --------------------------------------------------------------------------------------------
    +
    +  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
    +
    +    override def map(id: Long) = {
    +      if (id.equals(srcId)) {
    +        0.0
    +      } else {
    +        Double.PositiveInfinity
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Function that updates the value of a vertex by picking the minimum
    +   * distance from all incoming messages.
    +   */
    +  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
    +
    +    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
    +      var minDistance = Double.MaxValue
    +      while (inMessages.hasNext) {
    +        var msg = inMessages.next
    +        if (msg < minDistance) {
    +          minDistance = msg
    +        }
    +      }
    +      if (vertex.getValue > minDistance) {
    +        setNewVertexValue(minDistance)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Distributes the minimum distance associated with a given vertex among all
    +   * the target vertices summed up with the edge's value.
    +   */
    +  private final class MinDistanceMessenger extends
    +    MessagingFunction[Long, Double, Double, Double] {
    +
    +    override def sendMessages(vertex: Vertex[Long, Double]) {
    +      for (edge: Edge[Long, Double] <- getEdges) {
    +        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
    +      }
    +    }
    +  }
    +
    +  // ****************************************************************************
    +  // UTIL METHODS
    +  // ****************************************************************************
    +
    +  private var fileOutput = false
    +  private var srcVertexId = 1L
    +  private var edgesInputPath: String = null
    +  private var outputPath: String = null
    +  private var maxIterations = 5
    +
    +  private def parseParameters(args: Array[String]): Boolean = {
    +    if(args.length > 0) {
    +      if(args.length != 4) {
    +        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
    +          " <input edges path> <output path> <num iterations>")
    +        false
    +      }
    +    fileOutput = true
    --- End diff --
    
    indention +2 spaces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41410532
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.api.common.functions.MapFunction
    +import scala.collection.JavaConversions._
    +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
    +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
    +import org.apache.flink.graph.gsa.GatherFunction
    +import org.apache.flink.graph.gsa.Neighbor
    +import org.apache.flink.graph.gsa.SumFunction
    +import org.apache.flink.graph.gsa.ApplyFunction
    +
    +/**
    + * This example shows how to use Gelly's gather-sum-apply iterations.
    + * 
    + * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
    + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
    + *
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
    + */
    +object GSASingleSourceShortestPaths {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +    }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
    +
    +    // Execute the gather-sum-apply iteration
    +    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
    +      new UpdateDistance, maxIterations)
    +
    +    // Extract the vertices as the result
    +    val singleSourceShortestPaths = result.getVertices
    +
    +    // emit result
    +    if (fileOutput) {
    +      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("GSA Single Source Shortest Paths Example")
    +    } else {
    +      singleSourceShortestPaths.print()
    +    }
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  Single Source Shortest Path UDFs
    +  // --------------------------------------------------------------------------------------------
    +
    +  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
    +
    +    override def map(id: Long) = {
    +      if (id.equals(srcId)) {
    +        0.0
    +      } else {
    +        Double.PositiveInfinity
    +      }
    +    }
    +  }
    +
    +  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
    +    override def gather(neighbor: Neighbor[Double, Double]) = {
    +      neighbor.getNeighborValue + neighbor.getEdgeValue
    +    }
    +  }
    +
    +  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
    +    override def sum(newValue: Double, currentValue: Double) = {
    +      Math.min(newValue, currentValue)
    +    }
    +  }
    +
    +  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
    +    override def apply(newDistance: Double, oldDistance: Double) = {
    +      if (newDistance < oldDistance) {
    +        setResult(newDistance)
    +      }
    +    }
    +  }
    +
    +  // **************************************************************************
    +  // UTIL METHODS
    +  // **************************************************************************
    +
    +  private var fileOutput = false
    +  private var srcVertexId = 1L
    +  private var edgesInputPath: String = null
    +  private var outputPath: String = null
    +  private var maxIterations = 5
    +
    +  private def parseParameters(args: Array[String]): Boolean = {
    +    if(args.length > 0) {
    +      if(args.length != 4) {
    +        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
    +          " <input edges path> <output path> <num iterations>")
    +        false
    +      }
    +      fileOutput = true
    +      srcVertexId = args(0).toLong
    +      edgesInputPath = args(1)
    +      outputPath = args(2)
    +      maxIterations = (3).toInt
    +      } else {
    --- End diff --
    
    indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41389386
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.library.GSAConnectedComponents
    +import java.lang.Long
    +
    +/**
    + * This example shows how to use Gelly's library methods.
    + * You can find all available library methods in [[org.apache.flink.graph.library]]. 
    + * 
    + * In particular, this example uses the
    + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
    + * library method to compute the connected components of the input graph.
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\n1\t3\n</code> defines two edges,
    + * 1-2 with and 1-3.
    + *
    + * Usage {{
    + *   ConnectedComponents <edge path> <result path> <number of iterations>
    + *   }}
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
    + */
    +object ConnectedComponents {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +      }
    --- End diff --
    
    The indention of the closing bracket is not in sync with Flink's remaining Scala code. It should on the same level as the opening statement, `if (!!parseParameters(args)) {` in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41411055
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.spargel.VertexUpdateFunction
    +import org.apache.flink.graph.spargel.MessageIterator
    +import org.apache.flink.graph.Vertex
    +import org.apache.flink.graph.spargel.MessagingFunction
    +import scala.collection.JavaConversions._
    +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
    +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
    +
    +/**
    + * This example shows how to use Gelly's vertex-centric iterations.
    + * 
    + * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
    + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
    + *
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
    + */
    +object SingleSourceShortestPaths {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +    }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
    +
    +    // Execute the vertex-centric iteration
    +    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
    +      new MinDistanceMessenger, maxIterations)
    +
    +    // Extract the vertices as the result
    +    val singleSourceShortestPaths = result.getVertices
    +
    +    // emit result
    +    if (fileOutput) {
    +      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("Single Source Shortest Paths Example")
    +     } else {
    --- End diff --
    
    off by 1 space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1211#issuecomment-146250070
  
    sure, looks good mostly. 
    Found a few lines that need to be changed though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41389493
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.library.GSAConnectedComponents
    +import java.lang.Long
    +
    +/**
    + * This example shows how to use Gelly's library methods.
    + * You can find all available library methods in [[org.apache.flink.graph.library]]. 
    + * 
    + * In particular, this example uses the
    + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
    + * library method to compute the connected components of the input graph.
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\n1\t3\n</code> defines two edges,
    + * 1-2 with and 1-3.
    + *
    + * Usage {{
    + *   ConnectedComponents <edge path> <result path> <number of iterations>
    + *   }}
    + * If no parameters are provided, the program is run with default data from
    + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
    + */
    +object ConnectedComponents {
    +  def main(args: Array[String]) {
    +    if (!parseParameters(args)) {
    +      return
    +      }
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
    +    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
    +
    +    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
    +
    +
    +    // emit result
    +    if (fileOutput) {
    +      components.writeAsCsv(outputPath, "\n", ",")
    +      env.execute("Connected Components Example")
    +      } else {
    +        components.print()
    +        }
    +    }
    +
    +  private final class InitVertices extends MapFunction[Long, Long] {
    +    override def map(id: Long) = {id}
    +  }
    +
    +  // ***********************************************************************
    +  // UTIL METHODS
    +  // ***********************************************************************
    +
    +    private var fileOutput = false
    +    private var edgesInputPath: String = null
    +    private var outputPath: String = null
    +    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
    +
    +    private def parseParameters(args: Array[String]): Boolean = {
    +      if(args.length > 0) {
    +        if(args.length != 3) {
    +          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
    +              "<num iterations>")
    +              false
    --- End diff --
    
    `false` should be on the same indention level as `System.err.println(...)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2561] Adds gelly-scala examples

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1211#discussion_r41388362
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala.example;
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph.scala._
    +import org.apache.flink.graph.Edge
    +import org.apache.flink.types.NullValue
    +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.graph.library.GSAConnectedComponents
    +import java.lang.Long
    +
    +/**
    + * This example shows how to use Gelly's library methods.
    + * You can find all available library methods in [[org.apache.flink.graph.library]]. 
    + * 
    + * In particular, this example uses the
    + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
    + * library method to compute the connected components of the input graph.
    + *
    + * The input file is a plain text file and must be formatted as follows:
    + * Edges are represented by tuples of srcVertexId, trgVertexId which are
    + * separated by tabs. Edges themselves are separated by newlines.
    + * For example: <code>1\t2\n1\t3\n</code> defines two edges,
    + * 1-2 with and 1-3.
    --- End diff --
    
    "1-2 and 1-3"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---