You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by PieterJanVanAeken <gi...@git.apache.org> on 2015/08/10 14:09:00 UTC

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

GitHub user PieterJanVanAeken opened a pull request:

    https://github.com/apache/flink/pull/1004

    [FLINK-1962] Add Gelly Scala API v2

    Second version of the PR for the Scala Gelly API. For more information, see PR #808 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PieterJanVanAeken/flink scala-gelly

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1004.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1004
    
----
commit 64400e89676ffc933f761656260b536665f1d398
Author: Pieter-Jan Van Aeken <pi...@euranova.eu>
Date:   2015-08-10T12:06:52Z

    [FLINK-1962] Add Gelly Scala API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-133730158
  
    I have created [FLINK-2561](https://issues.apache.org/jira/browse/FLINK-2561) for the sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by PieterJanVanAeken <gi...@git.apache.org>.
Github user PieterJanVanAeken commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132118620
  
    I fixed the unneccessary comment and did a quick rebase to keep up with master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132115823
  
    Looks good to merge except one cosmetic issue. Awesome job! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by PieterJanVanAeken <gi...@git.apache.org>.
Github user PieterJanVanAeken commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-131156825
  
    I would postpone adding the "completeness check" as it will currently fail. Since I started working on this, the Java Gelly API has changed and while I modified my work to be compatible with the changes, not all new Java Gelly methods have a Scala counterpart yet. It was discussed briefly in the initial PR #808 and it was decided that adding these new methods should go into a separate JIRA issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132241841
  
    If the APIs are in sync there should be one documentation for both, right?
    Similar as with the other APIs...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1004


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132235443
  
    What are the plans for documenting the Gelly Scala API ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-131158707
  
    Sounds good.
    
    If no one objects in the next days, I would merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-131169939
  
    +1 from me too. Thanks for your great work @PieterJanVanAeken!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-131148744
  
    Wow, looks like a good piece of work. Nicely with tests and everything.
    Build also passes, style looks good.
    
    +1 to merge this from my side. I'd like to wait for a day or two to get a comment from one of the Gelly people (Vasia or Andra).
    
    One thing, though: In the Batch and streaming APIs, we added a "completeness check" to make sure that methods added to the Java APIs are also present in the Scala APIs. Would that be a good thing to add here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by PieterJanVanAeken <gi...@git.apache.org>.
Github user PieterJanVanAeken commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132252484
  
    As mentioned, they are not in sync yet. We need to create an issue to update the Scala API so that it is in sync with the Java API. A completeness test should also be added once that is done. 
    
    Since this API just a wrapper, no additional documentation seems needed but we could add a "java/scala" tab to the documentation, similar to the Batch API documentation. It would look extremely similar though, so I'm not sure how much added value it would have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1004#issuecomment-132119301
  
    All right, let's merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1004#discussion_r37273914
  
    --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---
    @@ -0,0 +1,735 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala
    +
    +import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.{tuple => jtuple}
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph._
    +import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
    +import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
    +import org.apache.flink.{graph => jg}
    +
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.reflect.ClassTag
    +
    +object Graph {
    +  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
    +                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
    +    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
    +  }
    +
    +  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
    +  ExecutionEnvironment): Graph[K, VV, EV] = {
    +    wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
    +      .asJavaCollection, env.getJavaEnv))
    +  }
    +}
    +
    +/**
    + * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
    + * @param jgraph the underlying java api Graph.
    + * @tparam K the key type for vertex and edge identifiers
    + * @tparam VV the value type for vertices
    + * @tparam EV the value type for edges
    + * @see org.apache.flink.graph.Edge
    + * @see org.apache.flink.graph.Vertex
    + */
    +final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    +
    +  private[flink] def getWrappedGraph = jgraph
    +
    +
    +  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
    +    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
    +      ClosureCleaner.clean(f, checkSerializable)
    +    }
    +    ClosureCleaner.ensureSerializable(f)
    +    f
    +  }
    +
    +  /**
    +   * @return the vertex DataSet.
    +   */
    +  def getVertices = wrap(jgraph.getVertices)
    +
    +  /**
    +   * @return the edge DataSet.
    +   */
    +  def getEdges = wrap(jgraph.getEdges)
    +
    +  /**
    +   * @return the vertex DataSet as Tuple2.
    +   */
    +  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
    +    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
    +  }
    +
    +  /**
    +   * @return the edge DataSet as Tuple3.
    +   */
    +  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
    +    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]):
    +  Graph[K, NV, EV] = {
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](
    +      mapper,
    +      createTypeInformation[Vertex[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV] = {
    +    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, VV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Vertex[K, VV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, createTypeInformation[Vertex[K, NV]]))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K,
    +    VV, NV] = {
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](
    +      mapper,
    +      createTypeInformation[Edge[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV] = {
    +    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, EV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Edge[K, EV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]]))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
    +    (VV, T), VV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
    +    (EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF on
    +   * the resulted values. In case the inputDataSet contains the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF on
    +   * the resulted values. In case the inputDataSet contains the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K,
    +    EV]]) = {
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: Edge[K, EV] =>
    +    Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
    +    }
    +
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
    +    }
    +
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
    +    }
    +
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
    +    wrapGraph(jgraph.filterOnEdges(edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
    +    }
    +
    +    //wrapGraph(jgraph.filterOnEdges(edgeFilter))
    --- End diff --
    
    Maybe unnecessary comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---