You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2015/12/08 13:57:11 UTC

[jira] [Commented] (FLINK-3138) Method References are not supported as lambda expressions

    [ https://issues.apache.org/jira/browse/FLINK-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046828#comment-15046828 ] 

Aljoscha Krettek commented on FLINK-3138:
-----------------------------------------

How does this manifest?

I changed the WordCount (streaming) example in {{flink-java8}} to this and it still works:
{code}
public class WordCount {
	
	// *************************************************************************
	//     PROGRAM
	// *************************************************************************

	public static String keyIt(Tuple2<String, Integer> e) {
		return e.f0;
	}

	public static void main(String[] args) throws Exception {
		
		if(!parseParameters(args)) {
			return;
		}
		
		// set up the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// get input data
		DataStream<String> text = getTextDataStream(env);
		
		DataStream<Tuple2<String, Integer>> counts = 
				// normalize and split each line
				text.map(line -> line.toLowerCase().split("\\W+")).returns(String[].class)
				// convert splitted line in pairs (2-tuples) containing: (word,1)
				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
					// emit the pairs with non-zero-length words
					Arrays.stream(tokens)
					.filter(t -> t.length() > 0)
					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
				}).returns("Tuple2<String, Integer>")
				// group by the tuple field "0" and sum up tuple field "1"
				.keyBy(WordCount::keyIt)
				.sum(1);

		// emit result
		if(fileOutput) {
			counts.writeAsCsv(outputPath, 1);
		} else {
			counts.print();
		}
		
		// execute program
		env.execute("Streaming WordCount Example");
	}
	
	// *************************************************************************
	//     UTIL METHODS
	// *************************************************************************
	
	private static boolean fileOutput = false;
	private static String textPath;
	private static String outputPath;
	
	private static boolean parseParameters(String[] args) {
		
		if(args.length > 0) {
			// parse input arguments
			fileOutput = true;
			if(args.length == 2) {
				textPath = args[0];
				outputPath = args[1];
			} else {
				System.err.println("Usage: WordCount <text path> <result path>");
				return false;
			}
		} else {
			System.out.println("Executing WordCount example with built-in default data.");
			System.out.println("  Provide parameters to read input data from a file.");
			System.out.println("  Usage: WordCount <text path> <result path>");
		}
		return true;
	}
	
	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
		if (fileOutput) {
			// read the text file from given input path
			return env.readTextFile(textPath);
		} else {
			// get default test text data
			return env.fromElements(WordCountData.WORDS);
		}
	}
}
{code}

> Method References are not supported as lambda expressions
> ---------------------------------------------------------
>
>                 Key: FLINK-3138
>                 URL: https://issues.apache.org/jira/browse/FLINK-3138
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.10.2
>            Reporter: Stephan Ewen
>             Fix For: 1.0.0
>
>
> For many functions (here for example KeySelectors), one can use lambda expressions:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( v -> v.getId() )
> {code}
> Java's other syntax for this, Method References, do not work:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( MyType::getId )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)