You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by sekruse <gi...@git.apache.org> on 2014/07/01 15:52:30 UTC

[GitHub] incubator-flink pull request: providing accumulator example as pro...

GitHub user sekruse opened a pull request:

    https://github.com/apache/incubator-flink/pull/55

    providing accumulator example as proposed in FLINK-828

    - shows how to build a custom accumulator (for vectors)
    - employ that accumulator for obtaining filter statistics

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sekruse/incubator-flink FLINK-828

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/55.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #55
    
----
commit 2b12cbca8286b4c9567291afe17f85d6b6f4ecf6
Author: Sebastian Kruse <se...@hpi.de>
Date:   2014-07-01T13:22:18Z

    providing accumulator example as proposed in FLINK-828
    
    - shows how to build a custom accumulator (for vectors)
    - employ that accumulator for obtaining filter statistics

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14445872
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,247 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.api.java.tuple.Tuple3;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
    + * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
    + * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path or "example"&gt; &lt;result path&gt;</code> <br>
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(final String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		// get the data set
    +		final DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		final DataSet<Tuple> filteredLines = file
    +				.filter(new FilterFunction<Tuple>() {
    +
    +					// create a new accumulator in each filter function instance
    +					// accumulators can be merged later on
    +					private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +					@Override
    +					public void open(final Configuration parameters) throws Exception {
    +						super.open(parameters);
    +
    +						// register the accumulator instance
    +						getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +								this.emptyFieldCounter);
    +					}
    +
    +					@Override
    +					public boolean filter(final Tuple t) {
    +						boolean containsEmptyFields = false;
    +
    +						// iterate over the tuple fields looking for empty ones
    +						for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +							final String field = t.getField(pos);
    +							if (field == null || field.trim().isEmpty()) {
    +								containsEmptyFields = true;
    +
    +								// if an empty field is encountered, update the
    +								// accumulator
    +								this.emptyFieldCounter.add(pos);
    +							}
    +						}
    +
    +						return !containsEmptyFields;
    +					}
    +				});
    +
    +		// Here, we could do further processing with the filtered lines...
    +		filteredLines.writeAsCsv(outputPath);
    +
    +		// execute program
    +		final JobExecutionResult result = env.execute("Accumulator example");
    +
    +		// get the accumulator result via its registration key
    +		final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +		System.out.format("Number of detected empty fields per column: %s\n",
    +				emptyFields);
    +
    +	}
    +
    +	// *************************************************************************
    +	// UTIL METHODS
    +	// *************************************************************************
    +
    +	private static String filePath;
    +	private static String outputPath;
    +
    +	private static boolean parseParameters(final String[] programArguments) {
    +
    +		if (programArguments.length > 0) {
    +			if (programArguments.length == 2) {
    +				filePath = programArguments[0];
    +				outputPath = programArguments[1];
    +			} else {
    +				System.err.println("Usage: FilterAndCountIncompleteLines <input file path or \"example\"> <result path>");
    +				return false;
    +			}
    +		} else {
    +			System.err.println("This program expects a semicolon-delimited CSV file with nine columns.\n"
    --- End diff --
    
    Can you change the code that it does not require any parameters at all? (The user still has to figure out how to pass parameters to the program)
    So if the user does not specify any arguments, we use `getExampleInputTuples()` to get some input data. We print the result using dataSet.print() to stdout.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443290
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    --- End diff --
    
    Why are you not using a `Tuple9<String,String, ...>` for the DataSet ?
    This works also fine but you are loosing the compile-time type checking. Basically the typeparamters will make line 105: https://github.com/apache/incubator-flink/pull/55/files#diff-70cc9ad1c8f8e2e82718452620b0d33eR105 safer.
    
    I think its fine in this case to use the `Tuple` but I want to note that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14448525
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,248 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.api.java.tuple.Tuple3;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
    + * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
    + * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines [&lt;input file path&gt; [&lt;result path&gt;]]</code> <br>
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(final String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		// get the data set
    +		final DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		final DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    --- End diff --
    
    Would you mind not defining the FilterFunction inline. 
    IMO, separating the data flow and the UDF logic is easier to read, esp. if the UDF implements multiple methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-47756246
  
    Code looks good. :smile:
    Can we change the name of the program? What do you think about EmptyFieldsCountAccumulator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14442947
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    +
    +			// create a new accumulator in each filter function instance
    +			// accumulators can be merged later on
    +			private VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +			/*
    +			 * (non-Javadoc)
    --- End diff --
    
    Can you remove the "(non-Javadoc)" javadoc comments? They don't add any value, because its obvious that there is no Javadoc.
    
    We had quite a lot of these comments in our project in the past and did a huge search/replace session to remove all of them.
    
    As a side note: It seems that you've limited with width of your code. We are currently not limiting the width, so you don't need to wrap long lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-48932201
  
    I'm going to merge this pull request ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by sekruse <gi...@git.apache.org>.
Github user sekruse commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-47751359
  
    Okay, done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by sekruse <gi...@git.apache.org>.
Github user sekruse commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-47756694
  
    Since the main purpose of this program is the demonstration of accumulators, that might actually be a good idea :wink:. I'll incorporate your feedback after lunch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/55


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-47743880
  
    Thank you for the contribution.
    
    I have some minor comments regarding comment style / example style. In general, your code is very good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443106
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    +
    +			// create a new accumulator in each filter function instance
    +			// accumulators can be merged later on
    +			private VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +			/*
    +			 * (non-Javadoc)
    +			 * 
    +			 * @see
    +			 * eu.stratosphere.api.common.functions.AbstractFunction#open(eu
    +			 * .stratosphere.configuration.Configuration)
    +			 */
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				// register the accumulator instance
    +				getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +						this.emptyFieldCounter);
    +			}
    +
    +			@Override
    +			public boolean filter(Tuple t) {
    +				boolean containsEmptyFields = false;
    +
    +				// iterate over the tuple fields looking for empty ones
    +				for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +					String field = t.getField(pos);
    +					if (field == null || field.trim().isEmpty()) {
    +						containsEmptyFields = true;
    +
    +						// if an empty field is encountered, update the
    +						// accumulator
    +						this.emptyFieldCounter.add(pos);
    +					}
    +				}
    +
    +				return !containsEmptyFields;
    +			}
    +		});
    +
    +		// Here, we could do further processing with the filtered lines...
    +		filteredLines.writeAsCsv(outputPath);
    +
    +		// execute program
    +		JobExecutionResult result = env.execute("Accumulator example");
    +
    +		// get the accumulator result via its registration key
    +		List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +		System.out.format("Number of detected empty fields per column: %s\n",
    +				emptyFields);
    +
    +	}
    +
    +	// *************************************************************************
    +	// UTIL METHODS
    +	// *************************************************************************
    +
    +	private static String filePath;
    +	private static String outputPath;
    +
    +	private static boolean parseParameters(String[] programArguments) {
    +
    +		if (programArguments.length > 0) {
    +			if (programArguments.length == 2) {
    +				filePath = programArguments[0];
    +				outputPath = programArguments[1];
    +			} else {
    +				System.err
    +						.println("Usage: FilterAndCountIncompleteLines <input file path> <result path>");
    +				return false;
    +			}
    +		} else {
    +			System.err
    +					.println("This program expects a semicolon-delimited CSV file with nine columns.\n"
    +							+ "  Usage: FilterAndCountIncompleteLines <input file path> <result path>");
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private static DataSource<Tuple> getDataSet(ExecutionEnvironment env) {
    +
    +		DataSource<? extends Tuple> source = env
    +				.readCsvFile(filePath)
    +				.fieldDelimiter(';')
    +				.includeFields("111111111")
    +				.types(String.class, String.class, String.class, String.class,
    +						String.class, String.class, String.class, String.class,
    +						String.class);
    +		return (DataSource<Tuple>) source;
    +	}
    +
    +	/**
    +	 * This accumulator lets you increase vector components distributedly. The
    +	 * {@link #add(Integer)} method lets you increase the <i>n</i>-th vector
    +	 * component by 1, whereat <i>n</i> is the methods parameter. The size of
    +	 * the vector is automatically managed.
    +	 */
    +	public static class VectorAccumulator implements
    +			Accumulator<Integer, List<Integer>> {
    +
    +		/** Stores the accumulated vector components. */
    +		private final List<Integer> resultVector = new ArrayList<Integer>();
    +
    +		/**
    +		 * Increases the result vector component at the specified position by 1.
    +		 */
    +		@Override
    +		public void add(Integer position) {
    +			updateResultVector(position, 1);
    +		}
    +
    +		/**
    +		 * Increases the result vector component at the specified position by
    +		 * the specified delta.
    +		 */
    +		private void updateResultVector(int position, int delta) {
    +			// inflate the vector to contain the given position
    +			while (resultVector.size() <= position) {
    +				resultVector.add(0);
    +			}
    +
    +			// increment the component value
    +			int component = resultVector.get(position);
    +			this.resultVector.set(position, component + delta);
    +		}
    +
    +		/*
    +		 * (non-Javadoc)
    +		 * 
    +		 * @see
    +		 * eu.stratosphere.api.common.accumulators.Accumulator#getLocalValue()
    +		 */
    +		@Override
    +		public List<Integer> getLocalValue() {
    +			return this.resultVector;
    +		}
    +
    +		/*
    +		 * (non-Javadoc)
    +		 * 
    +		 * @see eu.stratosphere.api.common.accumulators.Accumulator#resetLocal()
    +		 */
    +		@Override
    +		public void resetLocal() {
    +			// clear the result vector if the accumulator instance shall be reused
    +			this.resultVector.clear();
    +		}
    +
    +		/*
    +		 * (non-Javadoc)
    +		 * 
    +		 * @see
    +		 * eu.stratosphere.api.common.accumulators.Accumulator#merge(eu.stratosphere
    +		 * .api.common.accumulators.Accumulator)
    +		 */
    +		@Override
    +		public void merge(Accumulator<Integer, List<Integer>> other) {
    +			
    +			// merge two vector accumulators by adding their up their vector components
    +			List<Integer> otherVector = other.getLocalValue();
    +			for (int index = 0; index < otherVector.size(); index++) {
    +				updateResultVector(index, otherVector.get(index));
    +			}
    +		}
    +
    +		/*
    +		 * (non-Javadoc)
    +		 * 
    +		 * @see
    +		 * eu.stratosphere.core.io.IOReadableWritable#write(java.io.DataOutput)
    --- End diff --
    
    Can you remove these Javadocs? Every modern IDE allows to jump to the method definition in the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443047
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    +
    +			// create a new accumulator in each filter function instance
    +			// accumulators can be merged later on
    +			private VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +			/*
    +			 * (non-Javadoc)
    +			 * 
    +			 * @see
    +			 * eu.stratosphere.api.common.functions.AbstractFunction#open(eu
    +			 * .stratosphere.configuration.Configuration)
    +			 */
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				// register the accumulator instance
    +				getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +						this.emptyFieldCounter);
    +			}
    +
    +			@Override
    +			public boolean filter(Tuple t) {
    +				boolean containsEmptyFields = false;
    +
    +				// iterate over the tuple fields looking for empty ones
    +				for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +					String field = t.getField(pos);
    +					if (field == null || field.trim().isEmpty()) {
    +						containsEmptyFields = true;
    +
    +						// if an empty field is encountered, update the
    +						// accumulator
    +						this.emptyFieldCounter.add(pos);
    +					}
    +				}
    +
    +				return !containsEmptyFields;
    +			}
    +		});
    +
    +		// Here, we could do further processing with the filtered lines...
    +		filteredLines.writeAsCsv(outputPath);
    +
    +		// execute program
    +		JobExecutionResult result = env.execute("Accumulator example");
    +
    +		// get the accumulator result via its registration key
    +		List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +		System.out.format("Number of detected empty fields per column: %s\n",
    +				emptyFields);
    +
    +	}
    +
    +	// *************************************************************************
    +	// UTIL METHODS
    +	// *************************************************************************
    +
    +	private static String filePath;
    +	private static String outputPath;
    +
    +	private static boolean parseParameters(String[] programArguments) {
    +
    +		if (programArguments.length > 0) {
    +			if (programArguments.length == 2) {
    +				filePath = programArguments[0];
    +				outputPath = programArguments[1];
    +			} else {
    +				System.err
    +						.println("Usage: FilterAndCountIncompleteLines <input file path> <result path>");
    +				return false;
    +			}
    +		} else {
    +			System.err
    +					.println("This program expects a semicolon-delimited CSV file with nine columns.\n"
    +							+ "  Usage: FilterAndCountIncompleteLines <input file path> <result path>");
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private static DataSource<Tuple> getDataSet(ExecutionEnvironment env) {
    +
    +		DataSource<? extends Tuple> source = env
    +				.readCsvFile(filePath)
    +				.fieldDelimiter(';')
    +				.includeFields("111111111")
    --- End diff --
    
    I don't think you need this line. The reader selects all fields by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443022
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    +
    +			// create a new accumulator in each filter function instance
    +			// accumulators can be merged later on
    +			private VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +			/*
    +			 * (non-Javadoc)
    +			 * 
    +			 * @see
    +			 * eu.stratosphere.api.common.functions.AbstractFunction#open(eu
    +			 * .stratosphere.configuration.Configuration)
    +			 */
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				// register the accumulator instance
    +				getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +						this.emptyFieldCounter);
    +			}
    +
    +			@Override
    +			public boolean filter(Tuple t) {
    +				boolean containsEmptyFields = false;
    +
    +				// iterate over the tuple fields looking for empty ones
    +				for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +					String field = t.getField(pos);
    +					if (field == null || field.trim().isEmpty()) {
    +						containsEmptyFields = true;
    +
    +						// if an empty field is encountered, update the
    +						// accumulator
    +						this.emptyFieldCounter.add(pos);
    +					}
    +				}
    +
    +				return !containsEmptyFields;
    +			}
    +		});
    +
    +		// Here, we could do further processing with the filtered lines...
    +		filteredLines.writeAsCsv(outputPath);
    +
    +		// execute program
    +		JobExecutionResult result = env.execute("Accumulator example");
    +
    +		// get the accumulator result via its registration key
    +		List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +		System.out.format("Number of detected empty fields per column: %s\n",
    +				emptyFields);
    +
    +	}
    +
    +	// *************************************************************************
    +	// UTIL METHODS
    +	// *************************************************************************
    +
    +	private static String filePath;
    +	private static String outputPath;
    +
    +	private static boolean parseParameters(String[] programArguments) {
    +
    +		if (programArguments.length > 0) {
    +			if (programArguments.length == 2) {
    +				filePath = programArguments[0];
    +				outputPath = programArguments[1];
    +			} else {
    +				System.err
    +						.println("Usage: FilterAndCountIncompleteLines <input file path> <result path>");
    --- End diff --
    
    Can you turn this into `System.err.println` without the newline and tabs in between?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443079
  
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java ---
    @@ -0,0 +1,281 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing so, it
    + * counts the number of empty fields per column within a CSV file using a custom
    + * accumulator for vectors. In this context, empty fields are those, that at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; &lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +	// *************************************************************************
    +	// PROGRAM
    +	// *************************************************************************
    +
    +	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment
    +				.getExecutionEnvironment();
    +
    +		// get the data set
    +		DataSet<Tuple> file = getDataSet(env);
    +
    +		// filter lines with empty fields
    +		DataSet<Tuple> filteredLines = file.filter(new FilterFunction<Tuple>() {
    +
    +			// create a new accumulator in each filter function instance
    +			// accumulators can be merged later on
    +			private VectorAccumulator emptyFieldCounter = new VectorAccumulator();
    +
    +			/*
    +			 * (non-Javadoc)
    +			 * 
    +			 * @see
    +			 * eu.stratosphere.api.common.functions.AbstractFunction#open(eu
    +			 * .stratosphere.configuration.Configuration)
    +			 */
    +			@Override
    +			public void open(Configuration parameters) throws Exception {
    +				super.open(parameters);
    +
    +				// register the accumulator instance
    +				getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +						this.emptyFieldCounter);
    +			}
    +
    +			@Override
    +			public boolean filter(Tuple t) {
    +				boolean containsEmptyFields = false;
    +
    +				// iterate over the tuple fields looking for empty ones
    +				for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +					String field = t.getField(pos);
    +					if (field == null || field.trim().isEmpty()) {
    +						containsEmptyFields = true;
    +
    +						// if an empty field is encountered, update the
    +						// accumulator
    +						this.emptyFieldCounter.add(pos);
    +					}
    +				}
    +
    +				return !containsEmptyFields;
    +			}
    +		});
    +
    +		// Here, we could do further processing with the filtered lines...
    +		filteredLines.writeAsCsv(outputPath);
    +
    +		// execute program
    +		JobExecutionResult result = env.execute("Accumulator example");
    +
    +		// get the accumulator result via its registration key
    +		List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +		System.out.format("Number of detected empty fields per column: %s\n",
    +				emptyFields);
    +
    +	}
    +
    +	// *************************************************************************
    +	// UTIL METHODS
    +	// *************************************************************************
    +
    +	private static String filePath;
    +	private static String outputPath;
    +
    +	private static boolean parseParameters(String[] programArguments) {
    --- End diff --
    
    It would be cool to ship the example with a build-in dataset. This way, users don't need to generate / provide the input data. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: providing accumulator example as pro...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/55#issuecomment-47889257
  
    Okay, I think the pull request is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---