You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:57 UTC
[26/39] git commit: [FLINK-701] Change KeySelector to a SAM interface
[FLINK-701] Change KeySelector to a SAM interface
This closes #85
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/934e4e00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/934e4e00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/934e4e00
Branch: refs/heads/travis_test
Commit: 934e4e00df012b4aab128294c05153d0c46f9887
Parents: bc89e91
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 20:31:54 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 1 02:33:34 2014 +0200
----------------------------------------------------------------------
.../testfunctions/IdentityKeyExtractor.java | 3 +-
.../flink/api/java/functions/KeySelector.java | 10 ++--
.../flink/api/java/typeutils/TypeExtractor.java | 9 +++-
.../java/type/extractor/TypeExtractorTest.java | 23 +++++++++
.../lambdas/KeySelectorTest.java | 52 ++++++++++++++++++++
5 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
index 39ef821..7e7ba16 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
@@ -16,12 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.testfunctions;
import org.apache.flink.api.java.functions.KeySelector;
-public class IdentityKeyExtractor<T> extends KeySelector<T, T> {
+public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index ede7c32..1dcc8c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.functions;
+import org.apache.flink.api.common.functions.Function;
/**
* The {@link KeySelector} allows to use arbitrary objects for operations such as
@@ -28,9 +29,7 @@ package org.apache.flink.api.java.functions;
* @param <IN> Type of objects to extract the key from.
* @param <KEY> Type of key.
*/
-public abstract class KeySelector<IN, KEY> implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
+public interface KeySelector<IN, KEY> extends Function, java.io.Serializable {
/**
* User-defined function that extracts the key from an arbitrary object.
@@ -54,6 +53,9 @@ public abstract class KeySelector<IN, KEY> implements java.io.Serializable {
*
* @param value The object to get the key from.
* @return The extracted key.
+ *
+ * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
+ * and trigger recovery or cancellation of the program.
*/
- public abstract KEY getKey(IN value);
+ KEY getKey(IN value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index e8ee0bb..d03cc49 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -38,9 +38,11 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.functions.InvalidTypesException;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.types.Value;
@@ -134,6 +136,10 @@ public class TypeExtractor {
@SuppressWarnings("unchecked")
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
+ if (FunctionUtils.isLambdaFunction(selectorInterface)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
+
validateInputType(KeySelector.class, selectorInterface.getClass(), 0, inType);
if(selectorInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) selectorInterface).getProducedType();
@@ -406,7 +412,8 @@ public class TypeExtractor {
return parameter;
}
- throw new IllegalArgumentException(baseClass.getName() + " must be implemented.");
+ throw new IllegalArgumentException("The types of the interface " + baseClass.getName() + " could not be inferred. " +
+ "Support for synthetic interfaces, lambdas, and generic types is limited at this point.");
}
private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 8346d00..d5044a8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1430,4 +1430,27 @@ public class TypeExtractorTest {
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
}
+
+ @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+ @Test
+ public void testExtractKeySelector() {
+ KeySelector<String, Integer> selector = new KeySelector<String, Integer>() {
+ @Override
+ public Integer getKey(String value) { return null; }
+ };
+
+ TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(selector, BasicTypeInfo.STRING_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+
+ try {
+ TypeExtractor.getKeySelectorTypes((KeySelector) selector, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ Assert.fail();
+ }
+ catch (InvalidTypesException e) {
+ // good
+ }
+ catch (Exception e) {
+ Assert.fail("wrong exception type");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
new file mode 100644
index 0000000..fd61e25
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+
+public class KeySelectorTest {
+
+ public void testSelectorLambda() {
+ try {
+ KeySelector<Tuple2<String, Integer>, String> selector = (t) -> t.f0;
+
+ try {
+ TypeExtractor.getKeySelectorTypes(selector,
+ new TupleTypeInfo<Tuple2<String, Integer>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+ Assert.fail("No unsupported lambdas exception");
+ }
+ catch (UnsupportedLambdaExpressionException e) {
+ // good
+ }
+ catch (Exception e) {
+ Assert.fail("Wrong exception type");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}