You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2015/12/08 13:57:11 UTC
[jira] [Commented] (FLINK-3138) Method References are not supported
as lambda expressions
[ https://issues.apache.org/jira/browse/FLINK-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046828#comment-15046828 ]
Aljoscha Krettek commented on FLINK-3138:
-----------------------------------------
How does this manifest?
I changed the WordCount (streaming) example in {{flink-java8}} to this and it still works:
{code}
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static String keyIt(Tuple2<String, Integer> e) {
return e.f0;
}
public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text = getTextDataStream(env);
DataStream<Tuple2<String, Integer>> counts =
// normalize and split each line
text.map(line -> line.toLowerCase().split("\\W+")).returns(String[].class)
// convert splitted line in pairs (2-tuples) containing: (word,1)
.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
// emit the pairs with non-zero-length words
Arrays.stream(tokens)
.filter(t -> t.length() > 0)
.forEach(t -> out.collect(new Tuple2<>(t, 1)));
}).returns("Tuple2<String, Integer>")
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(WordCount::keyIt)
.sum(1);
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, 1);
} else {
counts.print();
}
// execute program
env.execute("Streaming WordCount Example");
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
private static boolean parseParameters(String[] args) {
if(args.length > 0) {
// parse input arguments
fileOutput = true;
if(args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
System.err.println("Usage: WordCount <text path> <result path>");
return false;
}
} else {
System.out.println("Executing WordCount example with built-in default data.");
System.out.println(" Provide parameters to read input data from a file.");
System.out.println(" Usage: WordCount <text path> <result path>");
}
return true;
}
private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
// get default test text data
return env.fromElements(WordCountData.WORDS);
}
}
}
{code}
> Method References are not supported as lambda expressions
> ---------------------------------------------------------
>
> Key: FLINK-3138
> URL: https://issues.apache.org/jira/browse/FLINK-3138
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 0.10.2
> Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> For many functions (here for example KeySelectors), one can use lambda expressions:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( v -> v.getId() )
> {code}
> Java's other syntax for this, Method References, do not work:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( MyType::getId )
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)