You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2014/07/07 16:51:30 UTC

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

GitHub user uce opened a pull request:

    https://github.com/apache/incubator-flink/pull/63

    [FLINK-758] Add count operator to DataSet

    - Adds a count operator to DataSet:
    
    ```java
    DataSet<String> text = env.fromElements(
        "Who's there?",
        "I think I hear them. Stand, ho! Who's there?");
    DataSet<Long> count = text.count(); // 2
    ```
    - Adds an all (ungrouped) reduce variant, which allows to specify an initial value to the reduce function
    - Adds an utility method to InstantiationUtil to serialize a record to a byte array
    - Fixes some javadocs warnings in DataSet
    
    I've tested the count operator on a cluster with DOP 1, 200 and verified the results with `cat | wc -l`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/incubator-flink FLINK-758-count_operator

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/63.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #63
    
----
commit 7ae6ffb8f25525a1633fe27a08af0604faa5b3b6
Author: uce <u....@fu-berlin.de>
Date:   2014-07-03T14:35:48Z

    Add byte array serialization to InstantiationUtil

commit 3d6b1fe9c8efd1ab31cc260a29663c60e3fb1116
Author: uce <u....@fu-berlin.de>
Date:   2014-07-07T13:57:45Z

    Add initial value to all (ungrouped) reduce
    
    Adds an all (ungrouped) reduce variant to DataSet, which allows to
    specify an initial value for the ReduceFunction. The initial value is
    an extra input element to the ReduceFuntion and needs to be of the same
    type as the ReduceFunction input type. The initial value is not used
    with the combiners.
    
    In cases, where there is no input to the reduce function, for example
    after a filter operator, which filters all elements, the ReduceFunction
    will be called with the initial value only.

commit dfbe9ed6557fae798ba82ffff79809e93b779164
Author: uce <u....@fu-berlin.de>
Date:   2014-07-07T13:59:26Z

    [FLINK-758] Add count operator to DataSet
    
    Adds a count method to DataSet, which translates to a map-reduce. The
    map operator maps each element to a 1 and the reduce operator sums up
    all the 1s, resulting in the total count of elements.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by hsaputra <gi...@git.apache.org>.
Github user hsaputra commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-48392105
  
    Does it need corresponding Scala API counterpart?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-48443910
  
    It already has the corresponding counterpart. But after your question I realized that the Java API is lacking the count on grouped datasets, which the Scala API supports.
    
    ```
    input.count() // works
    input.groupBy(...).count() // does not work
    ```
    
    I will add it to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-49601334
  
    I've rebased this PR on the renamed master, but it is just a single commit (not the most elegant way to do this... if someone complains I'll fix it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-63621886
  
    I'll think it's best to close this PR. It's superceded by the upcoming intermediate result changes and #210. Nobody complained about not having a fold-style reduce with an initial value, so I think it's OK to discard these features as well. I will only merge the byte array serialization commit (1b893a1245c1f7317c9f0a062fc38ceabad7afcf) of this PR as I think it might be useful in general.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-54751876
  
    Thanks for the review. The initial value for the reduce function and the count operator are tightly connected. The reduce with initial value is the general solution, of which the count operator is a special case. Therefore, I wouldn't say that these are independent features. The refactorings are also limited to files related to the initial value reduce/count operator.
    
    The counting for grouped data sets was a quick fix after @hsaputra's comment. We can either fix it with this PR or open a seperate issue if we want to merge it.
    
    I think the limitation to AllReduce was the result of a discussion with you and @StephanEwen.
    
    ---
    
    All in all, I think that we should wait for the upcoming changes to the runtime and scheduler to support the more intuitive API of simply returning the count to the user program. As you said, we might move some of the changes (like initial value reduce) to a separate issue if we find them useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/63#discussion_r17215488
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CountOperator.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.commons.lang3.Validate;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.functions.ReduceFunction;
    +import org.apache.flink.api.java.typeutils.BasicTypeInfo;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Iterator;
    +
    +/**
    + * A {@link DataSet} that is the result of a count transformation.
    + * <p/>
    + * The count will be executed as a map-reduce. The map operator maps every element of the input to a 1 and the all
    + * reduce sums the ones up to the total count.
    + *
    + * @param <IN> The type of the data set aggregated by the operator.
    + */
    +public class CountOperator<IN> extends SingleInputUdfOperator<IN, Long, CountOperator<IN>> {
    +
    +	private final Grouping<IN> grouping;
    +
    +	public CountOperator(DataSet<IN> input) {
    +		super(input, BasicTypeInfo.LONG_TYPE_INFO);
    +		grouping = null;
    +	}
    +
    +	public CountOperator(Grouping<IN> input) {
    +		super(Validate.notNull(input).getDataSet(), BasicTypeInfo.LONG_TYPE_INFO);
    +		this.grouping = input;
    +	}
    +
    +	@Override
    +	protected org.apache.flink.api.common.operators.SingleInputOperator<?, Long, ?> translateToDataFlow(
    +			org.apache.flink.api.common.operators.Operator<IN> input) {
    +		if (grouping == null) {
    +			// map to ones
    +			UnaryOperatorInformation<IN, Long> countMapOpInfo =
    +					new UnaryOperatorInformation<IN, Long>(getInputType(), BasicTypeInfo.LONG_TYPE_INFO);
    +			MapOperatorBase<IN, Long, MapFunction<IN, Long>> countMapOp =
    +					new MapOperatorBase<IN, Long, MapFunction<IN, Long>>(
    +							new CountingMapUdf(), countMapOpInfo, "Count: map to ones");
    +
    +			countMapOp.setInput(input);
    +			countMapOp.setDegreeOfParallelism(input.getDegreeOfParallelism());
    +
    +			// sum ones
    +			UnaryOperatorInformation<Long, Long> countReduceOpInfo =
    +					new UnaryOperatorInformation<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    +			ReduceOperatorBase<Long, ReduceFunction<Long>> countReduceOp =
    +					new ReduceOperatorBase<Long, ReduceFunction<Long>>(
    +							new CountingReduceUdf(), countReduceOpInfo, "Count: sum ones");
    +
    +			countReduceOp.setInput(countMapOp);
    +			countReduceOp.setDegreeOfParallelism(1);
    +			countReduceOp.setInitialValue(countReduceOpInfo.getInputType().createSerializer(), 0L);
    +
    +			return countReduceOp;
    +		}
    +		else {
    +			return new ReduceGroupOperator<IN, Long>(grouping, new CountingGroupReduceUdf<IN>())
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-54748255
  
    I had a look at this PR and found a few issues:
    - it contains changes for several independent features
      - Initial value for ReduceFunction
      - Count operator
      - many cosmetic changes / documentation improvements
    - my gut feeling is, that rebasing this PR onto the current master will cause many merge conflicts. It might be worthwhile to separate these issues into independent PRs to make the merging easier.
    - counting for grouped datasets is done with a non-combinable GroupReduceFunction which is not vey efficient
    - An initial value for ReduceFunction is only supported for AllReduce. I see that the original motivation for this (a 0-valued count for empty datasets) does not make sense for grouped ReduceFunctions, but this is not the only way an initial value could be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-48483961
  
    I think it should work, yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-48454584
  
    Should be in line with Scala now.
    
    @aljoscha: expression key grouping did not work wih group reduce. after looking into the reduce operator, where it does work, I figured that the small change in 3ccb50032af8eebada19d0e2ac5ec1a0e62ba99c should be enough. Can you confirm this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce closed the pull request at:

    https://github.com/apache/incubator-flink/pull/63


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-63457614
  
    See #210.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-54807258
  
    Bear in mind that the count will be given for free also when the first-class handling of intermediate results is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/63#issuecomment-54794514
  
    Sure, the count op requires the init value, but not vice versa. I meant we could do the init value change first (which also has a separate JIRA I think) and put the count op on top.
    
    If we wait for the runtime changes, should we just close this PR for now and extract the init value changes for a separate PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-758] Add count operator to Da...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/63#discussion_r17214869
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CountOperator.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.commons.lang3.Validate;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.functions.ReduceFunction;
    +import org.apache.flink.api.java.typeutils.BasicTypeInfo;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Iterator;
    +
    +/**
    + * A {@link DataSet} that is the result of a count transformation.
    + * <p/>
    + * The count will be executed as a map-reduce. The map operator maps every element of the input to a 1 and the all
    + * reduce sums the ones up to the total count.
    + *
    + * @param <IN> The type of the data set aggregated by the operator.
    + */
    +public class CountOperator<IN> extends SingleInputUdfOperator<IN, Long, CountOperator<IN>> {
    +
    +	private final Grouping<IN> grouping;
    +
    +	public CountOperator(DataSet<IN> input) {
    +		super(input, BasicTypeInfo.LONG_TYPE_INFO);
    +		grouping = null;
    +	}
    +
    +	public CountOperator(Grouping<IN> input) {
    +		super(Validate.notNull(input).getDataSet(), BasicTypeInfo.LONG_TYPE_INFO);
    +		this.grouping = input;
    +	}
    +
    +	@Override
    +	protected org.apache.flink.api.common.operators.SingleInputOperator<?, Long, ?> translateToDataFlow(
    +			org.apache.flink.api.common.operators.Operator<IN> input) {
    +		if (grouping == null) {
    +			// map to ones
    +			UnaryOperatorInformation<IN, Long> countMapOpInfo =
    +					new UnaryOperatorInformation<IN, Long>(getInputType(), BasicTypeInfo.LONG_TYPE_INFO);
    +			MapOperatorBase<IN, Long, MapFunction<IN, Long>> countMapOp =
    +					new MapOperatorBase<IN, Long, MapFunction<IN, Long>>(
    +							new CountingMapUdf(), countMapOpInfo, "Count: map to ones");
    +
    +			countMapOp.setInput(input);
    +			countMapOp.setDegreeOfParallelism(input.getDegreeOfParallelism());
    +
    +			// sum ones
    +			UnaryOperatorInformation<Long, Long> countReduceOpInfo =
    +					new UnaryOperatorInformation<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    +			ReduceOperatorBase<Long, ReduceFunction<Long>> countReduceOp =
    +					new ReduceOperatorBase<Long, ReduceFunction<Long>>(
    +							new CountingReduceUdf(), countReduceOpInfo, "Count: sum ones");
    +
    +			countReduceOp.setInput(countMapOp);
    +			countReduceOp.setDegreeOfParallelism(1);
    +			countReduceOp.setInitialValue(countReduceOpInfo.getInputType().createSerializer(), 0L);
    +
    +			return countReduceOp;
    +		}
    +		else {
    +			return new ReduceGroupOperator<IN, Long>(grouping, new CountingGroupReduceUdf<IN>())
    --- End diff --
    
    Using a non-combinable GroupReduceFunction for counting is unnecessarily inefficient.
    We could extract the key fields using a Mapper and add a count-1 and use a ReduceFunction as well.
    This requires a few cases due to different key types but should be the way to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---