You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 06:07:21 UTC
[2/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8
module and improve lambda type extraction
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index b9dba77..0000000
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.java8.wordcount;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the streaming "WordCount" program that computes a simple word occurrences
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount <text path> <result path></code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataStream<String> text = getTextDataStream(env);
-
- DataStream<Tuple2<String, Integer>> counts =
- // normalize and split each line
- text.map(line -> line.toLowerCase().split("\\W+"))
- // convert split line in pairs (2-tuples) containing: (word,1)
- .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
- // emit the pairs with non-zero-length words
- Arrays.stream(tokens)
- .filter(t -> t.length() > 0)
- .forEach(t -> out.collect(new Tuple2<>(t, 1)));
- })
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(0)
- .sum(1);
-
- // emit result
- if (fileOutput) {
- counts.writeAsCsv(outputPath);
- } else {
- counts.print();
- }
-
- // execute program
- env.execute("Streaming WordCount Example");
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- textPath = args[0];
- outputPath = args[1];
- } else {
- System.err.println("Usage: WordCount <text path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing WordCount example with built-in default data.");
- System.out.println(" Provide parameters to read input data from a file.");
- System.out.println(" Usage: WordCount <text path> <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
- if (fileOutput) {
- // read the text file from given input path
- return env.readTextFile(textPath);
- } else {
- // get default test text data
- return env.fromElements(WordCountData.WORDS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
deleted file mode 100644
index de1f395..0000000
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.type.lambdas;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests the type extractor for lambda functions.
- */
-@SuppressWarnings("serial")
-public class LambdaExtractionTest {
-
- private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
- new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
-
- private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
- new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
-
- @Test
- public void testIdentifyLambdas() {
- try {
- MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
- };
-
- MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
- };
-
- MapFunction<?, ?> fromProperClass = new StaticMapper();
-
- MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
- @Override
- public Tuple2<Integer, Long> map(Integer value) {
- return new Tuple2<>(value, 1L);
- }
- };
-
- MapFunction<String, Integer> staticLambda = Integer::parseInt;
- MapFunction<Integer, String> instanceLambda = Object::toString;
- MapFunction<String, Integer> constructorLambda = Integer::new;
-
- assertNull(checkAndExtractLambda(anonymousFromInterface));
- assertNull(checkAndExtractLambda(anonymousFromClass));
- assertNull(checkAndExtractLambda(fromProperClass));
- assertNull(checkAndExtractLambda(fromDerived));
- assertNotNull(checkAndExtractLambda(staticLambda));
- assertNotNull(checkAndExtractLambda(instanceLambda));
- assertNotNull(checkAndExtractLambda(constructorLambda));
- assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static class StaticMapper implements MapFunction<String, Integer> {
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
- }
-
- private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
- @Override
- Tuple2<T, Long> map(T value) throws Exception;
- }
-
- private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
-
- private static class MyClass {
- private String s = "mystring";
-
- public MapFunction<Integer, String> getMapFunction() {
- return (i) -> s;
- }
- }
-
- @Test
- public void testLambdaWithMemberVariable() {
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
- Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
- }
-
- @Test
- public void testLambdaWithLocalVariable() {
- String s = "mystring";
- final int k = 24;
- int j = 26;
-
- MapFunction<Integer, String> f = (i) -> s + k + j;
-
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
- Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
- }
-
- @Test
- public void testMapLambda() {
- MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testFlatMapLambda() {
- FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
- TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testMapPartitionLambda() {
- MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
- TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testGroupReduceLambda() {
- GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
- TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testFlatJoinLambda() {
- FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
- TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testJoinLambda() {
- JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
- TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testCoGroupLambda() {
- CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
- TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testCrossLambda() {
- CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
- TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @Test
- public void testKeySelectorLambda() {
- KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
- TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
- if (!(ti instanceof MissingTypeInfo)) {
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
- }
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testLambdaTypeErasure() {
- MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
- Assert.assertTrue(ti instanceof MissingTypeInfo);
- }
-
- @Test
- public void testPartitionerLambda() {
- Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
- final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
-
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-
- }
-
- private static class MyType {
- private int key;
-
- public int getKey() {
- return key;
- }
-
- public void setKey(int key) {
- this.key = key;
- }
-
- protected int getKey2() {
- return 0;
- }
- }
-
- @Test
- public void testInstanceMethodRefSameType() {
- MapFunction<MyType, Integer> f = MyType::getKey;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
- }
-
- @Test
- public void testInstanceMethodRefSuperType() {
- MapFunction<Integer, String> f = Object::toString;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
- }
-
- private static class MySubtype extends MyType {
- public boolean test;
- }
-
- @Test
- public void testInstanceMethodRefSuperTypeProtected() {
- MapFunction<MySubtype, Integer> f = MyType::getKey2;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
- }
-
- @Test
- public void testConstructorMethodRef() {
- MapFunction<String, Integer> f = Integer::new;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
- }
-
- private interface InterfaceWithDefaultMethod {
- void samMethod();
-
- default void defaultMethod() {
-
- }
- }
-
- @Test
- public void testSamMethodExtractionInterfaceWithDefaultMethod() {
- final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
- assertNotNull(sam);
- assertEquals("samMethod", sam.getName());
- }
-
- private interface InterfaceWithMultipleMethods {
- void firstMethod();
-
- void secondMethod();
- }
-
- @Test(expected = InvalidTypesException.class)
- public void getSingleAbstractMethodMultipleMethods() throws Exception {
- TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
- }
-
- private interface InterfaceWithoutAbstractMethod {
- default void defaultMethod() {
-
- }
- }
-
- @Test(expected = InvalidTypesException.class)
- public void getSingleAbstractMethodNoAbstractMethods() throws Exception {
- TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
- }
-
- private abstract class AbstractClassWithSingleAbstractMethod {
- public abstract void defaultMethod();
- }
-
- @Test(expected = InvalidTypesException.class)
- public void getSingleAbstractMethodNotAnInterface() throws Exception {
- TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
deleted file mode 100644
index 7cbdf6a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for lambda support in CEP.
- */
-public class CEPLambdaTest extends TestLogger {
- /**
- * Test event class.
- */
- public static class EventA {}
-
- /**
- * Test event class.
- */
- public static class EventB {}
-
- /**
- * Tests that a Java8 lambda can be passed as a CEP select function.
- */
- @Test
- public void testLambdaSelectFunction() {
- TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
- TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
- DataStream<EventA> inputStream = new DataStream<>(
- StreamExecutionEnvironment.getExecutionEnvironment(),
- new SourceTransformation<>(
- "source",
- null,
- eventTypeInformation,
- 1));
-
- Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
- PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
- DataStream<EventB> result = patternStream.select(
- (Map<String, List<EventA>> map) -> new EventB()
- );
-
- assertEquals(outputTypeInformation, result.getType());
- }
-
- /**
- * Tests that a Java8 lambda can be passed as a CEP flat select function.
- */
- @Test
- public void testLambdaFlatSelectFunction() {
- TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
- TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
- DataStream<EventA> inputStream = new DataStream<>(
- StreamExecutionEnvironment.getExecutionEnvironment(),
- new SourceTransformation<>(
- "source",
- null,
- eventTypeInformation,
- 1));
-
- Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
- PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
- DataStream<EventB> result = patternStream.flatSelect(
- (Map<String, List<EventA>> map, Collector<EventB> collector) -> collector.collect(new EventB())
- );
-
- assertEquals(outputTypeInformation, result.getType());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
deleted file mode 100644
index ca11275..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda4;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.jar.JarInputStream;
-import java.util.zip.ZipEntry;
-
-/**
- * Tests for the {@link JarFileCreator}.
- */
-public class JarFileCreatorLambdaTest {
- @Test
- public void testFilterFunctionOnLambda1() throws Exception {
- File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
- JarFileCreator jfc = new JarFileCreator(out);
- jfc.addClass(FilterLambda1.class)
- .createJarFile();
-
- Set<String> ans = new HashSet<String>();
- ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
- Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
- out.delete();
- }
-
- @Test
- public void testFilterFunctionOnLambda2() throws Exception{
- File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
- JarFileCreator jfc = new JarFileCreator(out);
- jfc.addClass(FilterLambda2.class)
- .createJarFile();
-
- Set<String> ans = new HashSet<String>();
- ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
- Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
- out.delete();
- }
-
- @Test
- public void testFilterFunctionOnLambda3() throws Exception {
- File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
- JarFileCreator jfc = new JarFileCreator(out);
- jfc.addClass(FilterLambda3.class)
- .createJarFile();
-
- Set<String> ans = new HashSet<String>();
- ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class");
-
- Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
- out.delete();
- }
-
- @Test
- public void testFilterFunctionOnLambda4() throws Exception {
- File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
- JarFileCreator jfc = new JarFileCreator(out);
- jfc.addClass(FilterLambda4.class)
- .createJarFile();
-
- Set<String> ans = new HashSet<String>();
- ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
- ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
-
- Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
- out.delete();
- }
-
- public boolean validate(Set<String> expected, File out) throws Exception {
- int count = expected.size();
- try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
- ZipEntry ze;
- while ((ze = jis.getNextEntry()) != null) {
- count--;
- expected.remove(ze.getName());
- }
- }
- return count == 0 && expected.size() == 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
deleted file mode 100644
index 12abff9..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * A lambda filter using a static method.
- */
-public class FilterLambda1 {
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
- FilterFunction<String> filter = (v) -> WordFilter.filter(v);
-
- DataSet<String> output = input.filter(filter);
- output.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
deleted file mode 100644
index 9555607..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}.
- */
-public class FilterLambda2 {
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
- DataSet<String> output = input.filter((v) -> WordFilter.filter(v));
- output.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
deleted file mode 100644
index b493722..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance.
- */
-public class FilterLambda3 {
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
- DataSet<String> output = input.filter(UtilFunction.getWordFilter());
- output.print();
-
- env.execute();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
deleted file mode 100644
index 606ef5e..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda3} with additional indirection.
- */
-public class FilterLambda4 {
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
- DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
- output.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
deleted file mode 100644
index 1d5394a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * Static factory for a lambda filter function.
- */
-public class UtilFunction {
- public static FilterFunction<String> getWordFilter() {
- return (v) -> WordFilter.filter(v);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
deleted file mode 100644
index de8f68a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A wrapper around {@link WordFilter} to introduce additional indirection.
- */
-public class UtilFunctionWrapper {
- /**
- * Static factory for a lambda filter function.
- */
- public static class UtilFunction {
- public static FilterFunction<String> getWordFilter() {
- return (v) -> WordFilter.filter(v);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
deleted file mode 100644
index 4a5b16f..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-/**
- * Static filter method for lambda tests.
- */
-public class WordFilter {
- public static boolean filter(String value) {
- return !value.contains("not");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index cee34af..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "aaabacad\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
- DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
- String conc = "";
- for (String s : values) {
- conc = conc.concat(s);
- }
- out.collect(conc);
- });
- concatDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
deleted file mode 100644
index a70f37a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "6\n3\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
- .with((values1, values2, out) -> {
- int sum = 0;
- for (Tuple2<Integer, String> next : values1) {
- sum += next.f0;
- }
- for (Tuple2<Integer, String> next : values2) {
- sum += next.f0;
- }
- out.collect(sum);
- });
- joined.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
deleted file mode 100644
index 32cd910..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,hello not\n" +
- "3,what's not\n" +
- "3,up not\n" +
- "2,hello much\n" +
- "3,what's much\n" +
- "3,up much\n" +
- "3,hello really\n" +
- "4,what's really\n" +
- "4,up really";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.cross(right)
- .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
deleted file mode 100644
index 6ad1058..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter functions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n";
-
- public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
- List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
- data.add(new Tuple3<>(1, 1L, "Hi"));
- data.add(new Tuple3<>(2, 2L, "Hello"));
- data.add(new Tuple3<>(3, 2L, "Hello world"));
- data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
- data.add(new Tuple3<>(5, 3L, "I am fine."));
- data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
- data.add(new Tuple3<>(7, 4L, "Comment#1"));
- data.add(new Tuple3<>(8, 4L, "Comment#2"));
- data.add(new Tuple3<>(9, 4L, "Comment#3"));
- data.add(new Tuple3<>(10, 4L, "Comment#4"));
- data.add(new Tuple3<>(11, 5L, "Comment#5"));
- data.add(new Tuple3<>(12, 5L, "Comment#6"));
- data.add(new Tuple3<>(13, 5L, "Comment#7"));
- data.add(new Tuple3<>(14, 5L, "Comment#8"));
- data.add(new Tuple3<>(15, 5L, "Comment#9"));
- data.add(new Tuple3<>(16, 6L, "Comment#10"));
- data.add(new Tuple3<>(17, 6L, "Comment#11"));
- data.add(new Tuple3<>(18, 6L, "Comment#12"));
- data.add(new Tuple3<>(19, 6L, "Comment#13"));
- data.add(new Tuple3<>(20, 6L, "Comment#14"));
- data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
- Collections.shuffle(data);
-
- return env.fromCollection(data);
- }
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(value -> value.f2.contains("world"));
- filterDs.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index f793450..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,what's really\n" +
- "2,up really\n" +
- "1,hello not\n" +
- "1,hello much\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
- .with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
deleted file mode 100644
index d395d7d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "bb\n" +
- "bb\n" +
- "bc\n" +
- "bd\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
- DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
- flatMappedDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 53db541..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "abad\n" +
- "aaac\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
- new Tuple2<>(1, "aa"),
- new Tuple2<>(2, "ab"),
- new Tuple2<>(1, "ac"),
- new Tuple2<>(2, "ad")
- );
- DataSet<String> concatDs = stringDs
- .groupBy(0)
- .reduceGroup((values, out) -> {
- String conc = "";
- for (Tuple2<Integer, String> next : values) {
- conc = conc.concat(next.f1);
- }
- out.collect(conc);
- });
- concatDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
deleted file mode 100644
index d86ea49..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,what's really\n" +
- "2,up really\n" +
- "1,hello not\n" +
- "1,hello much\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
- .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
deleted file mode 100644
index 15a9b9d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
- private static class Trade {
-
- public String v;
-
- public Trade(String v) {
- this.v = v;
- }
-
- @Override
- public String toString() {
- return v;
- }
- }
-
- private static final String EXPECTED_RESULT = "22\n" +
- "22\n" +
- "23\n" +
- "24\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
- DataSet<String> mappedDs = stringDs
- .map(Object::toString)
- .map (s -> s.replace("1", "2"))
- .map(Trade::new)
- .map(Trade::toString);
- mappedDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
deleted file mode 100644
index 712132c..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
- List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
- data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
- data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
- data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
- data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
- data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
- data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
- data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
- data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
- data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
- data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
- data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
- data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
- data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
- data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
- data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
- Collections.shuffle(data);
-
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO
- );
-
- return env.fromCollection(data, type);
- }
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
- .groupBy(4, 0)
- .reduce((in1, in2) -> {
- Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
- out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
- return out;
- });
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/resources/log4j-test.properties b/flink-java8/src/test/resources/log4j-test.properties
deleted file mode 100644
index c977d4c..0000000
--- a/flink-java8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 0e6c2fe..521665f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -104,8 +104,7 @@ public class PatternStream<T> {
PatternSelectFunction.class,
0,
1,
- new int[]{0, 1, 0},
- new int[]{},
+ TypeExtractor.NO_INDEX,
inputStream.getType(),
null,
false);
@@ -173,8 +172,7 @@ public class PatternStream<T> {
PatternSelectFunction.class,
0,
1,
- new int[]{0, 1, 0},
- new int[]{},
+ TypeExtractor.NO_INDEX,
inputStream.getType(),
null,
false);
@@ -259,8 +257,7 @@ public class PatternStream<T> {
PatternSelectFunction.class,
0,
1,
- new int[]{0, 1, 0},
- new int[]{},
+ TypeExtractor.NO_INDEX,
inputStream.getType(),
null,
false);
@@ -270,8 +267,7 @@ public class PatternStream<T> {
PatternTimeoutFunction.class,
0,
1,
- new int[]{0, 1, 0},
- new int[]{},
+ TypeExtractor.NO_INDEX,
inputStream.getType(),
null,
false);
@@ -314,7 +310,6 @@ public class PatternStream<T> {
PatternFlatSelectFunction.class,
0,
1,
- new int[] {0, 1, 0},
new int[] {1, 0},
inputStream.getType(),
null,
@@ -381,7 +376,6 @@ public class PatternStream<T> {
PatternFlatSelectFunction.class,
0,
1,
- new int[]{0, 1, 0},
new int[]{1, 0},
inputStream.getType(),
null,
@@ -465,7 +459,6 @@ public class PatternStream<T> {
PatternFlatTimeoutFunction.class,
0,
1,
- new int[]{0, 1, 0},
new int[]{2, 0},
inputStream.getType(),
null,
@@ -476,7 +469,6 @@ public class PatternStream<T> {
PatternFlatSelectFunction.class,
0,
1,
- new int[]{0, 1, 0},
new int[]{1, 0},
inputStream.getType(),
null,
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 6d1013c..e397d31 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.cep;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
@@ -96,19 +97,15 @@ public class CEPITCase extends AbstractTestBase {
}
});
- DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+ DataStream<String> result = CEP.pattern(input, pattern).flatSelect((p, o) -> {
+ StringBuilder builder = new StringBuilder();
- @Override
- public String select(Map<String, List<Event>> pattern) {
- StringBuilder builder = new StringBuilder();
+ builder.append(p.get("start").get(0).getId()).append(",")
+ .append(p.get("middle").get(0).getId()).append(",")
+ .append(p.get("end").get(0).getId());
- builder.append(pattern.get("start").get(0).getId()).append(",")
- .append(pattern.get("middle").get(0).getId()).append(",")
- .append(pattern.get("end").get(0).getId());
-
- return builder.toString();
- }
- });
+ o.collect(builder.toString());
+ }, Types.STRING);
List<String> resultList = new ArrayList<>();
@@ -170,18 +167,14 @@ public class CEPITCase extends AbstractTestBase {
}
});
- DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
-
- @Override
- public String select(Map<String, List<Event>> pattern) {
- StringBuilder builder = new StringBuilder();
+ DataStream<String> result = CEP.pattern(input, pattern).select(p -> {
+ StringBuilder builder = new StringBuilder();
- builder.append(pattern.get("start").get(0).getId()).append(",")
- .append(pattern.get("middle").get(0).getId()).append(",")
- .append(pattern.get("end").get(0).getId());
+ builder.append(p.get("start").get(0).getId()).append(",")
+ .append(p.get("middle").get(0).getId()).append(",")
+ .append(p.get("end").get(0).getId());
- return builder.toString();
- }
+ return builder.toString();
});
List<String> resultList = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 6dcf766..95310b4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -78,7 +78,6 @@ public class Translate {
TranslateFunction.class,
0,
1,
- new int[]{0},
new int[]{1},
oldType,
null,
@@ -162,7 +161,6 @@ public class Translate {
TranslateFunction.class,
0,
1,
- new int[] {0},
new int[] {1},
oldType,
null,
@@ -248,7 +246,6 @@ public class Translate {
TranslateFunction.class,
0,
1,
- new int[]{0},
new int[]{1},
oldType,
null,
@@ -332,7 +329,6 @@ public class Translate {
TranslateFunction.class,
0,
1,
- new int[]{0},
new int[]{1},
oldType,
null,
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 0ca6eb9..33399f8 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -156,25 +156,6 @@ under the License.
<pluginManagement>
<plugins>
- <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
- <!--
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- <compilerId>jdt</compilerId>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.eclipse.tycho</groupId>
- <artifactId>tycho-compiler-jdt</artifactId>
- <version>0.21.0</version>
- </dependency>
- </dependencies>
- </plugin>
- -->
-
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>