You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by greghogan <gi...@git.apache.org> on 2015/12/16 18:54:13 UTC

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/1462

    [FLINK-2716] [gelly java scala] New checksum method on DataSet and Graph

    This implementation aggregates using `Object.hashCode`. As noted in FLINK-2716, `TypeComparator` has a hash function, which simply calls `hashCode` for basic types. For composite types (pojo, tuples, and case classes) the hash is computed over the keyed subset of fields, as noted by @StephanEwen. The differences between `hashCode` and `hash` are immaterial for this use case.
    
    Should this be added to the Python API? I am not finding count() on Python's `DataSet`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 2716_checksum_method_for_dataset_and_graph

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1462
    
----
commit 8ac28776a81bd70918b872d159f0a21c889d081d
Author: Greg Hogan <co...@greghogan.com>
Date:   2015-12-15T19:25:54Z

    [FLINK-2716] [gelly java scala] New checksum method on DataSet and Graph

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1462


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-170863758
  
    Scala code looks good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-171789574
  
    Looks good. One comment about parenthesis on the Scala methods, otherwise good.
    
    To safe testing uild time, we could add the tests of this to some existing test class (like MiscellaneousITCase). Messes up class separation a bit, but saves a lot of build time in the long run, because the main test time goes into the starting to the test mini cluster (which is once per class).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r49434439
  
    --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.Utils.Checksum
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +
    +import scala.reflect.ClassTag
    +
    +package object utils {
    +  /**
    +    * This class provides utility methods for computing checksums over a Graph.
    +    *
    +    * @param self Graph
    +    */
    +  implicit class GraphSetUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +  TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
    +
    +    /**
    +      * Computes the checksum over the Graph
    +      *
    +      * @return the checksum over the vertices and edges.
    +      */
    +    @throws(classOf[Exception])
    +    def checksumHashCode[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +    TypeInformation : ClassTag]: Checksum = {
    +      val checksum: Checksum = self.getVertices.checksumHashCode
    +      checksum.add(self.getEdges.checksumHashCode)
    +      return checksum
    --- End diff --
    
    `return` is not necessary here. Scala takes the last expression as the return value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-171934077
  
    Thanks, will merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r49789156
  
    --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.Utils.ChecksumHashCode
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +
    +import scala.reflect.ClassTag
    +
    +package object utils {
    +  /**
    +    * This class provides utility methods for computing checksums over a Graph.
    +    *
    +    * @param self Graph
    +    */
    +  implicit class GraphSetUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    +  TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
    +
    +    /**
    +      * Computes the ChecksumHashCode over the Graph.
    +      *
    +      * @return the ChecksumHashCode over the vertices and edges.
    +      */
    +    @throws(classOf[Exception])
    +    def checksumHashCode[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
    --- End diff --
    
    Would be good to give this method parenthesis. It triggers distributed execution, so is not quite side-effect free.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-171804466
  
    @StephanEwen Thanks for noting that! The tests needed to be moved from the "operator" directory into the classes with utility tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r47931079
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -394,6 +396,21 @@ public long count() throws Exception {
     		return res.<Long> getAccumulatorResult(id);
     	}
     
    +	/**
    +	 * Convenience method to get the count (number of elements) of a DataSet
    +	 * as well as the checksum (sum over element hashes).
    +	 *
    +	 * @return A Checksum that represents the count and checksum of elements in the data set.
    +	 */
    +	public Checksum checksum() throws Exception {
    +		final String id = new AbstractID().toString();
    +
    +		flatMap(new Utils.ChecksumHelper<T>(id)).name("checksum()")
    +				.output(new DiscardingOutputFormat<NullValue>()).name("checksum() sink");
    --- End diff --
    
    `RichSinkFunction` is in the flink-streaming-java module which is not a dependency of flink-java. Can an execution terminate without a `DataSink`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-167370924
  
    Yes, exactly, I had the `DataSetUtils` in mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-165524009
  
    Were you thinking of `DataSetUtils`? That looks like a nice home for this and already includes zip-with and sampling functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-165489647
  
    I think this is a nice addition. Unsure, though, if `checksum()` should be added to the `DataSet` directly. It is a tradeoff between convenience and API overload.
    
    A simpler alternative would be `Checksum chk = Utils.checksum(dataSet);`.
    
    Also, this is one way of computing a simple checksum (hash code). There may be other arithmetic methods, so it make sense to include some form of name of the checkpoint method in the method name. How about something like `Utils.checksumHashCode(dataSet)` or so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r47921105
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -394,6 +396,21 @@ public long count() throws Exception {
     		return res.<Long> getAccumulatorResult(id);
     	}
     
    +	/**
    +	 * Convenience method to get the count (number of elements) of a DataSet
    +	 * as well as the checksum (sum over element hashes).
    +	 *
    +	 * @return A Checksum that represents the count and checksum of elements in the data set.
    +	 */
    +	public Checksum checksum() throws Exception {
    +		final String id = new AbstractID().toString();
    +
    +		flatMap(new Utils.ChecksumHelper<T>(id)).name("checksum()")
    +				.output(new DiscardingOutputFormat<NullValue>()).name("checksum() sink");
    --- End diff --
    
    Saves one operator and source of confusion in the UI. Actually, the `collect()` and `count()` should be similarly simplified, come to think of it ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1462#issuecomment-170701135
  
    I'm not seeing a notification email for the second commit I just pushed to my repo. Also, I have attempted to get the Scala API correct but that needs a good review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r49435184
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala ---
    @@ -103,6 +105,25 @@ package object utils {
           : DataSet[T] = {
           wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed))
         }
    +
    +    // --------------------------------------------------------------------------------------------
    +    //  Checksum
    +    // --------------------------------------------------------------------------------------------
    +
    +    /**
    +      * Convenience method to get the count (number of elements) of a DataSet
    +      * as well as the checksum (sum over element hashes).
    +      *
    +      * @return A Checksum that represents the count and checksum of elements in the data set.
    +      *
    +      * @see org.apache.flink.api.java.Utils.ChecksumHelper
    --- End diff --
    
    In Scaladocs one can link using `[[org.apache.flink.api.java.Utils.ChecksumHelper]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r49789099
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala ---
    @@ -103,6 +105,25 @@ package object utils {
           : DataSet[T] = {
           wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed))
         }
    +
    +    // --------------------------------------------------------------------------------------------
    +    //  Checksum
    +    // --------------------------------------------------------------------------------------------
    +
    +    /**
    +      * Convenience method to get the count (number of elements) of a DataSet
    +      * as well as the checksum (sum over element hashes).
    +      *
    +      * @return A ChecksumHashCode with the count and checksum of elements in the data set.
    +      *
    +      * @see [[org.apache.flink.api.java.Utils.ChecksumHashCodeHelper]]
    +      */
    +    def checksumHashCode: ChecksumHashCode = {
    --- End diff --
    
    Would be good to give this method parenthesis. It triggers distributed execution, so is not quite side-effect free.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r47920383
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -394,6 +396,21 @@ public long count() throws Exception {
     		return res.<Long> getAccumulatorResult(id);
     	}
     
    +	/**
    +	 * Convenience method to get the count (number of elements) of a DataSet
    +	 * as well as the checksum (sum over element hashes).
    +	 *
    +	 * @return A Checksum that represents the count and checksum of elements in the data set.
    +	 */
    +	public Checksum checksum() throws Exception {
    +		final String id = new AbstractID().toString();
    +
    +		flatMap(new Utils.ChecksumHelper<T>(id)).name("checksum()")
    +				.output(new DiscardingOutputFormat<NullValue>()).name("checksum() sink");
    --- End diff --
    
    This trick to have a `flatMap()` function and then a discarding sink comes from earlier versions of Flink, where Sinks could not use Accumulators.
    
    Since RichSinkFunctions can use accumulators, it would be good to actually have the Checksum helper as a sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2716] [gelly java scala] New checksum m...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1462#discussion_r48012505
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -394,6 +396,21 @@ public long count() throws Exception {
     		return res.<Long> getAccumulatorResult(id);
     	}
     
    +	/**
    +	 * Convenience method to get the count (number of elements) of a DataSet
    +	 * as well as the checksum (sum over element hashes).
    +	 *
    +	 * @return A Checksum that represents the count and checksum of elements in the data set.
    +	 */
    +	public Checksum checksum() throws Exception {
    +		final String id = new AbstractID().toString();
    +
    +		flatMap(new Utils.ChecksumHelper<T>(id)).name("checksum()")
    +				.output(new DiscardingOutputFormat<NullValue>()).name("checksum() sink");
    --- End diff --
    
    Sorry, I meant a `RichOutputFormat` https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/RichOutputFormat.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---