You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:57 UTC

[26/39] git commit: [FLINK-701] Change KeySelector to a SAM interface

[FLINK-701] Change KeySelector to a SAM interface

This closes #85


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/934e4e00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/934e4e00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/934e4e00

Branch: refs/heads/travis_test
Commit: 934e4e00df012b4aab128294c05153d0c46f9887
Parents: bc89e91
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 20:31:54 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 1 02:33:34 2014 +0200

----------------------------------------------------------------------
 .../testfunctions/IdentityKeyExtractor.java     |  3 +-
 .../flink/api/java/functions/KeySelector.java   | 10 ++--
 .../flink/api/java/typeutils/TypeExtractor.java |  9 +++-
 .../java/type/extractor/TypeExtractorTest.java  | 23 +++++++++
 .../lambdas/KeySelectorTest.java                | 52 ++++++++++++++++++++
 5 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
index 39ef821..7e7ba16 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
 import org.apache.flink.api.java.functions.KeySelector;
 
-public class IdentityKeyExtractor<T> extends KeySelector<T, T> {
+public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index ede7c32..1dcc8c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.api.common.functions.Function;
 
 /**
  * The {@link KeySelector} allows to use arbitrary objects for operations such as
@@ -28,9 +29,7 @@ package org.apache.flink.api.java.functions;
  * @param <IN> Type of objects to extract the key from.
  * @param <KEY> Type of key.
  */
-public abstract class KeySelector<IN, KEY> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
+public interface KeySelector<IN, KEY> extends Function, java.io.Serializable {
 	
 	/**
 	 * User-defined function that extracts the key from an arbitrary object.
@@ -54,6 +53,9 @@ public abstract class KeySelector<IN, KEY> implements java.io.Serializable {
 	 * 
 	 * @param value The object to get the key from.
 	 * @return The extracted key.
+	 * 
+	 * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
+	 *                   and trigger recovery or cancellation of the program. 
 	 */
-	public abstract KEY getKey(IN value);
+	KEY getKey(IN value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index e8ee0bb..d03cc49 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -38,9 +38,11 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.types.Value;
@@ -134,6 +136,10 @@ public class TypeExtractor {
 	
 	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
+		if (FunctionUtils.isLambdaFunction(selectorInterface)) {
+			throw new UnsupportedLambdaExpressionException();
+		}
+		
 		validateInputType(KeySelector.class, selectorInterface.getClass(), 0, inType);
 		if(selectorInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) selectorInterface).getProducedType();
@@ -406,7 +412,8 @@ public class TypeExtractor {
 			return parameter;
 		}
 		
-		throw new IllegalArgumentException(baseClass.getName() + " must be implemented.");
+		throw new IllegalArgumentException("The types of the interface " + baseClass.getName() + " could not be inferred. " + 
+						"Support for synthetic interfaces, lambdas, and generic types is limited at this point.");
 	}
 	
 	private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 8346d00..d5044a8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1430,4 +1430,27 @@ public class TypeExtractorTest {
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
 	}
+	
+	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+	@Test
+	public void testExtractKeySelector() {
+		KeySelector<String, Integer> selector = new KeySelector<String, Integer>() {
+			@Override
+			public Integer getKey(String value) { return null; }
+		};
+
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(selector, BasicTypeInfo.STRING_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+		
+		try {
+			TypeExtractor.getKeySelectorTypes((KeySelector) selector, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+			Assert.fail();
+		}
+		catch (InvalidTypesException e) {
+			// good
+		}
+		catch (Exception e) {
+			Assert.fail("wrong exception type");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
new file mode 100644
index 0000000..fd61e25
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+
+public class KeySelectorTest {
+
+	public void testSelectorLambda() {
+		try {
+			KeySelector<Tuple2<String, Integer>, String> selector = (t) -> t.f0;
+			
+			try {
+				TypeExtractor.getKeySelectorTypes(selector, 
+						new TupleTypeInfo<Tuple2<String, Integer>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+				Assert.fail("No unsupported lambdas exception");
+			}
+			catch (UnsupportedLambdaExpressionException e) {
+				// good
+			}
+			catch (Exception e) {
+				Assert.fail("Wrong exception type");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+}