You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by andralungu <gi...@git.apache.org> on 2015/06/14 00:24:21 UTC

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

GitHub user andralungu opened a pull request:

    https://github.com/apache/flink/pull/832

    [FLINK-2152] Added zipWithIndex 

    This PR adds the zipWithIndex utility method to Flink's DataSetUtils as described in the mailing list discussion: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/The-correct-location-for-zipWithIndex-and-zipWithUniqueId-td6310.html. 
    
    The method could, in the future, be moved to DataSet. 
    
    @fhueske , @tillrohrmann , once we reach a conclusion for this one, I will also update #801 (I wouldn't like to fix unnecessary merge conflicts). 
    
    Once zipWIthUniqueIds is added, I could also explain the difference in the docs. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andralungu/flink zipWithIndex

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/832.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #832
    
----
commit fdbf0167cc10e952faddc2a7d71e73e7f1f2d03f
Author: andralungu <lu...@gmail.com>
Date:   2015-06-12T18:37:27Z

    [FLINK-2152] zipWithIndex implementation
    
    [FLINK-2152] Added zipWithIndex utility method

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114481973
  
    Hey @tillrohrmann , 
    
    Sorry for the incredibly late reply. The last weeks have been very hectic. 
    Nevertheless, I'd like to properly finish and polish this issue very soon. 
    
    For that: I have addressed the Java comments, but I still have to provide support for Scala. I love this task because it really takes me out of my comfort zone: Gelly and Java. It's no secret that Scala is not my strongest point. Therefore, I'd like to use this thread to ask some rather trivial questions: 
    
    Before defining implicit methods and using pimp-my-lib, I need to wrap the Java function. Which should be easy right? Since there is a `wrap` method.
    This being said, in org.apache.flink.api.scala, I created a DataSetUtils class. and wanted to call wrap(ju.countElements...). Apparently it does not let me. Can someone help me out with that?
    
    Thanks!
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by alexeygrigorev <gi...@git.apache.org>.
Github user alexeygrigorev commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32423042
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    +
    +		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
    +
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
    +
    +			long start = 0;
    +
    +			// compute the offset for each partition
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
    +
    +				Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
    +					@Override
    +					public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
    +						return compareInts(o1.f0, o2.f0);
    +					}
    +				});
    +
    +				for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
    +					start += offsets.get(i).f1;
    +				}
    +			}
    +
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
    +				for(T value: values) {
    +					out.collect(new Tuple2<Long, T>(start++, value));
    +				}
    +			}
    +		}).withBroadcastSet(elementCount, "counts");
    +	}
    +
    +	private static int compareInts(long x, int y) {
    +		return (x < y) ? -1 : ((x == y) ? 0 : 1);
    --- End diff --
    
    `x - y` does not always work http://stackoverflow.com/questions/2728793/java-integer-compareto-why-use-comparison-vs-subtraction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-116234551
  
    Perfectly valid comments, thanks! PR Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-115134406
  
    Hey Theo, 
    
    Thanks a lot for finding my bug there ^^
    PR updated to address the Java issues and to contain a  pimped Scala version of `zipWithIndex` :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114883882
  
    Uhmmm... flink.api.scala is imported. That's not the issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r33419459
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.util
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
    +import org.junit.rules.TemporaryFolder
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +import org.junit.{After, Before, Rule, Test}
    +import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUtilsITCase (mode: MultipleProgramsTestBase.TestExecutionMode) extends
    +MultipleProgramsTestBase(mode){
    +
    +  private var resultPath: String = null
    +  private var expectedResult: String = null
    +
    +  var tempFolder: TemporaryFolder = new TemporaryFolder()
    +
    +  @Rule
    +  def getFolder(): TemporaryFolder = {
    +    tempFolder;
    +  }
    +
    +  @Before
    +  @throws(classOf[Exception])
    +  def before {
    --- End diff --
    
    It would be better to insert a empty parenthesis and declare return type as Unit to match coding style of other test cases.
    
    I think `def before(): Unit = {` is better than now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r33419418
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.util
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
    +import org.junit.rules.TemporaryFolder
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +import org.junit.{After, Before, Rule, Test}
    +import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUtilsITCase (mode: MultipleProgramsTestBase.TestExecutionMode) extends
    +MultipleProgramsTestBase(mode){
    +
    +  private var resultPath: String = null
    +  private var expectedResult: String = null
    +
    +  var tempFolder: TemporaryFolder = new TemporaryFolder()
    +
    +  @Rule
    +  def getFolder(): TemporaryFolder = {
    +    tempFolder;
    --- End diff --
    
    Unnecessary semicolon :) You can simplify this method as `def getFolder = tempFolder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32414522
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    Well, moving it to `DataSet` means to change the API of Flink and that's something we first have to decide. Therefore, we first wanted to provide this functionality as part of a utils class. Since `zipWithIndex` is stateless with respect to the utils class, it should be `static`. For the Scala API you can, though, use the pimp my library pattern to allow a syntax `ds.zipWithIndex()`, with `ds` being some `DataSet`, without having to change the Scala API.
    
    Once we have decided whether these methods shall become part of the API, we can simply add them to the Java and Scala `DataSet` classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32415043
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    The reason for not having it in the core API is to keep that API slim and not to overload it with functionality that is not very often used (which is not always easy to decide). The pimp-my-lib pattern is useful if you cannot change the API directly (which we could). If we pimp the API, we could also move it directly to DataSet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r33419462
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.util
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
    +import org.junit.rules.TemporaryFolder
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +import org.junit.{After, Before, Rule, Test}
    +import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUtilsITCase (mode: MultipleProgramsTestBase.TestExecutionMode) extends
    +MultipleProgramsTestBase(mode){
    +
    +  private var resultPath: String = null
    +  private var expectedResult: String = null
    +
    +  var tempFolder: TemporaryFolder = new TemporaryFolder()
    +
    +  @Rule
    +  def getFolder(): TemporaryFolder = {
    +    tempFolder;
    +  }
    +
    +  @Before
    +  @throws(classOf[Exception])
    +  def before {
    +    resultPath = tempFolder.newFile.toURI.toString
    +  }
    +
    +  @Test
    +  @throws(classOf[Exception])
    +  def testZipWithIndex {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +
    +    val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F")
    +    val result: DataSet[(Long, String)] = input.zipWithIndex
    +
    +    result.writeAsCsv(resultPath, "\n", ",")
    +    env.execute()
    +
    +    expectedResult = "0,A\n" + "1,B\n" + "2,C\n" + "3,D\n" + "4,E\n" + "5,F"
    +  }
    +
    +  @After
    +  @throws(classOf[Exception])
    +  def after {
    --- End diff --
    
    Add a parenthesis and return type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32430399
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    +
    +		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
    +
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
    +
    +			long start = 0;
    +
    +			// compute the offset for each partition
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
    +
    +				Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
    +					@Override
    +					public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
    +						return compareInts(o1.f0, o2.f0);
    +					}
    +				});
    +
    +				for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
    +					start += offsets.get(i).f1;
    +				}
    +			}
    +
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
    +				for(T value: values) {
    +					out.collect(new Tuple2<Long, T>(start++, value));
    +				}
    +			}
    +		}).withBroadcastSet(elementCount, "counts");
    +	}
    +
    +	private static int compareInts(long x, int y) {
    +		return (x < y) ? -1 : ((x == y) ? 0 : 1);
    --- End diff --
    
    Ah, this makes sense :+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32404567
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    +
    +		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
    +
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
    +
    +			long start = 0;
    +
    +			// compute the offset for each partition
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
    +
    +				Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
    +					@Override
    +					public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
    +						return compareInts(o1.f0, o2.f0);
    +					}
    +				});
    +
    +				for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
    +					start += offsets.get(i).f1;
    +				}
    +			}
    +
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
    +				for(T value: values) {
    +					out.collect(new Tuple2<Long, T>(start++, value));
    +				}
    +			}
    +		}).withBroadcastSet(elementCount, "counts");
    +	}
    +
    +	private static int compareInts(long x, int y) {
    --- End diff --
    
    why is the first parameter a `long`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32415490
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    But you still would have to import the implicit cast for the pimp my library class to make it usable. Thus it is only syntactic sugar for writing something like `DataSetUtils.zipWithIndex(ds)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-116266267
  
    No problem ;)
    In this case I would kindly ask @andralungu to close the issue manually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32404633
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    +
    +		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
    +
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
    +
    +			long start = 0;
    +
    +			// compute the offset for each partition
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
    +
    +				Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
    +					@Override
    +					public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
    +						return compareInts(o1.f0, o2.f0);
    +					}
    +				});
    +
    +				for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
    +					start += offsets.get(i).f1;
    +				}
    +			}
    +
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
    +				for(T value: values) {
    +					out.collect(new Tuple2<Long, T>(start++, value));
    +				}
    +			}
    +		}).withBroadcastSet(elementCount, "counts");
    +	}
    +
    +	private static int compareInts(long x, int y) {
    +		return (x < y) ? -1 : ((x == y) ? 0 : 1);
    --- End diff --
    
    Why not simply doing `return x-y;`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114878728
  
    Actually, I get a weird compile error: it says missing Type parameter for the map in DataSet.scala...
    I think this is because the map is overriden... and I haven't found the workaround just yet...
    (The error is reproducible by calling `testZipWithIndex`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by thvasilo <gi...@git.apache.org>.
Github user thvasilo commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114880352
  
    Is it a type inference problem? Have you tried importing org.apache.flink.api.scala._ to see if that fixes it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32404396
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    --- End diff --
    
    retrieve


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-111856162
  
    +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32439968
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    +1 for making it static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-112000953
  
    Good work @andralungu.
    
    The data set utils are not working with the Scala API. Would be nice to support for Scala a syntax like
    ```
    val ds: DataSet[String] = ...
    val zipped: DataSet[(Long, String)] = ds.zipWithIndex
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32405496
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.util;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +@RunWith(Parameterized.class)
    +public class DataSetUtilsITCase extends MultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expectedResult;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	public DataSetUtilsITCase(TestExecutionMode mode) {
    +		super(mode);
    +	}
    +
    +	@Before
    +	public void before() throws Exception{
    +		resultPath = tempFolder.newFile().toURI().toString();
    +	}
    +
    +	@Test
    +	public void testZipWithIndex() throws Exception {
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F");
    +
    +		DataSetUtils<String> dataSetUtils = new DataSetUtils<String>();
    --- End diff --
    
    Why do we have to create a `DataSetUtils` object here? So far it does not store any state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu closed the pull request at:

    https://github.com/apache/flink/pull/832


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by thvasilo <gi...@git.apache.org>.
Github user thvasilo commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114895752
  
    Seems like the problem was that the wrap was returning a DataSet[(java.lang.Long, T)]
    
    If you change the map to `.map { t => (t.f0.toLong, t.f1) }` it should work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-114864032
  
    Sorry for my late reply. Has everything worked out by now?
    
    On Tue, Jun 23, 2015 at 2:43 PM, Andra Lungu <no...@github.com>
    wrote:
    
    > Hey @tillrohrmann <https://github.com/tillrohrmann> ,
    >
    > Sorry for the incredibly late reply. The last weeks have been very hectic.
    > Nevertheless, I'd like to properly finish and polish this issue very soon.
    >
    > For that: I have addressed the Java comments, but I still have to provide
    > support for Scala. I love this task because it really takes me out of my
    > comfort zone: Gelly and Java. It's no secret that Scala is not my strongest
    > point. Therefore, I'd like to use this thread to ask some rather trivial
    > questions:
    >
    > Before defining implicit methods and using pimp-my-lib, I need to wrap the
    > Java function. Which should be easy right? Since there is a wrap method.
    > This being said, in org.apache.flink.api.scala, I created a DataSetUtils
    > class. and wanted to call wrap(ju.countElements...). Apparently it does not
    > let me. Can someone help me out with that?
    >
    > Thanks!
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/832#issuecomment-114481973>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by alexeygrigorev <gi...@git.apache.org>.
Github user alexeygrigorev commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32411775
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    As a user, I'd prefer having `DataSet.zipWithIndex` rather than some utils class. This way it'll also be like in Scala collections and Spark - so there should be no surprises for users. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-116252346
  
    Oops! I forgot add "This closes #832" into commit message. I mistook because this is my first commit to upload Apache repository. Sorry. How can I fix it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32410253
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    Yes, I thought about making the method static. However, if we move it to DataSet, we would need to call it from an instance. Would you do DataSet.zipWithIndex()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32405317
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    +				}
    +
    +				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Method that takes a set of subtask index, total number of elements mappings
    +	 * and assigns ids to all the elements from the input data set.
    +	 *
    +	 * @param input the input data set
    +	 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
    +	 */
    +	public DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
    --- End diff --
    
    Wouldn't it be better to make this method static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r33419461
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.util
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
    +import org.junit.rules.TemporaryFolder
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +import org.junit.{After, Before, Rule, Test}
    +import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUtilsITCase (mode: MultipleProgramsTestBase.TestExecutionMode) extends
    +MultipleProgramsTestBase(mode){
    +
    +  private var resultPath: String = null
    +  private var expectedResult: String = null
    +
    +  var tempFolder: TemporaryFolder = new TemporaryFolder()
    +
    +  @Rule
    +  def getFolder(): TemporaryFolder = {
    +    tempFolder;
    +  }
    +
    +  @Before
    +  @throws(classOf[Exception])
    +  def before {
    +    resultPath = tempFolder.newFile.toURI.toString
    +  }
    +
    +  @Test
    +  @throws(classOf[Exception])
    +  def testZipWithIndex {
    --- End diff --
    
    Add a parenthesis and return type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-116252016
  
    Looks good :) merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/832#issuecomment-116208070
  
    Hi, I added some minor comments about coding style in Scala test case. The rest things is okay.
    I think we can merge this after fixing the style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/832#discussion_r32404719
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.utils;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +/**
    + * This class provides simple utility methods for zipping elements in a file with an index.
    + *
    + * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
    + */
    +public class DataSetUtils<T> {
    +
    +	/**
    +	 * Method that goes over all the elements in each partition in order to retireve
    +	 * the total number of elements.
    +	 *
    +	 * @param input the DataSet received as input
    +	 * @return a data set containing tuples of subtask index, number of elements mappings.
    +	 */
    +	public DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
    +		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer,Long>>() {
    +			@Override
    +			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
    +				long counter = 0;
    +				for(T value: values) {
    +					counter ++;
    --- End diff --
    
    whitespace between `counter` and `++`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---