You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:27 UTC

[5/5] flink git commit: [FLINK-2812] [streaming] KeySelectorUtil interacts well with type extraction

[FLINK-2812] [streaming] KeySelectorUtil interacts well with type extraction

The interaction is tested in the AggregationFunctionTest and the scala DataStreamTest amongst others.

Closes #1155


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e494c279
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e494c279
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e494c279

Branch: refs/heads/master
Commit: e494c2795252a7f3db3659b1919cc7c75fc3dbb9
Parents: c414ea9
Author: mbalassi <mb...@apache.org>
Authored: Mon Oct 5 22:19:53 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 17:10:27 2015 +0200

----------------------------------------------------------------------
 .../streaming/util/keys/KeySelectorUtil.java    | 20 +++++++++++++++-----
 .../flink/streaming/api/scala/DataStream.scala  | 10 +++++++---
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index d8839a0..9c76d95 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 /**
  * Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
@@ -47,12 +49,14 @@ public final class KeySelectorUtil {
 		
 		// use ascending order here, the code paths for that are usually a slight bit faster
 		boolean[] orders = new boolean[numKeyFields];
+		TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
 			orders[i] = true;
+			typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
 		}
-		
+
 		TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
-		return new ComparableKeySelector<X>(comparator, numKeyFields);
+		return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
 	}
 
 	
@@ -70,7 +74,7 @@ public final class KeySelectorUtil {
 
 		TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
 				logicalKeyPositions, new boolean[1], 0, executionConfig);
-		return new OneKeySelector<X, K>(comparator);
+		return new OneKeySelector<>(comparator);
 	}
 
 	/**
@@ -111,21 +115,23 @@ public final class KeySelectorUtil {
 	 *
 	 * @param <IN> The type from which the key is extracted.
 	 */
-	public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
+	public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
 
 		private static final long serialVersionUID = 1L;
 
 		private final TypeComparator<IN> comparator;
 		private final int keyLength;
+		private final TupleTypeInfo tupleTypeInfo;
 
 		/** Reusable array to hold the key objects. Since this is initially empty (all positions
 		 * are null), it does not have any serialization problems */
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final Object[] keyArray;
 
-		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) {
+		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
 			this.comparator = comparator;
 			this.keyLength = keyLength;
+			this.tupleTypeInfo = tupleTypeInfo;
 			keyArray = new Object[keyLength];
 		}
 
@@ -139,6 +145,10 @@ public final class KeySelectorUtil {
 			return key;
 		}
 
+		@Override
+		public TypeInformation<Tuple> getProducedType() {
+			return tupleTypeInfo;
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 19bcb73..8aeacb4 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
@@ -230,8 +231,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
 
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.keyBy(keyExtractor)
   }
@@ -256,8 +258,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.partitionByHash(keyExtractor)
   }
@@ -293,8 +296,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
   : DataStream[T] = {
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.partitionCustom(partitioner, keyExtractor)
   }