You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:49:02 UTC
[4/7] flink git commit: [FLINK-2627] [scala api] Makes scala data set
utils easier to access.
[FLINK-2627] [scala api] Makes scala data set utils easier to access.
The import required is now org.apache.flink.api.scala.utils._
Also adds a method to create case class type information for scala tuples
This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c5ebc8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c5ebc8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c5ebc8b
Branch: refs/heads/master
Commit: 0c5ebc8b9361e48c32425f1235a86e5e1bf34976
Parents: 3fe9145
Author: Sachin Goel <sa...@gmail.com>
Authored: Sat Sep 5 18:42:38 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:42:28 2015 +0200
----------------------------------------------------------------------
.../apache/flink/api/scala/CrossDataSet.scala | 24 +----
.../apache/flink/api/scala/DataSetUtils.scala | 103 ------------------
.../api/scala/UnfinishedCoGroupOperation.scala | 29 +----
.../apache/flink/api/scala/joinDataSet.scala | 23 +---
.../org/apache/flink/api/scala/package.scala | 33 +++++-
.../apache/flink/api/scala/utils/package.scala | 108 +++++++++++++++++++
.../api/scala/StreamCrossOperator.scala | 25 +----
.../api/scala/StreamJoinOperator.scala | 28 +----
.../flink/streaming/api/scala/package.scala | 8 +-
.../api/scala/operators/SampleITCase.scala | 4 +-
.../api/scala/util/DataSetUtilsITCase.scala | 6 +-
11 files changed, 160 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
index 19818a0..c9e1540 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
@@ -17,14 +17,10 @@
*/
package org.apache.flink.api.scala
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{CrossFunction, RichCrossFunction}
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -111,25 +107,7 @@ private[flink] object CrossDataSet {
(left, right)
}
}
- val returnType = new CaseClassTypeInfo[(L, R)](
- classOf[(L, R)],
- Array(leftInput.getType, rightInput.getType),
- Seq(leftInput.getType, rightInput.getType),
- Array("_1", "_2")) {
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer(executionConfig)
- }
-
- new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
- }
- }
- }
- }
+ val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
val crossOperator = new CrossOperator[L, R, (L, R)](
leftInput.javaSet,
rightInput.javaSet,
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
deleted file mode 100644
index 793b201..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{Utils, utils => jutils}
-
-import _root_.scala.language.implicitConversions
-import _root_.scala.reflect.ClassTag
-
-/**
- * This class provides simple utility methods for zipping elements in a data set with an index
- * or with a unique identifier.
- */
-
-class DataSetUtils[T](val self: DataSet[T]) {
-
- /**
- * Method that takes a set of subtask index, total number of elements mappings
- * and assigns ids to all the elements from the input data set.
- *
- * @return a data set of tuple 2 consisting of consecutive ids and initial values.
- */
- def zipWithIndex(implicit ti: TypeInformation[(Long, T)],
- ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
- wrap(jutils.DataSetUtils.zipWithIndex(self.javaSet))
- .map { t => (t.f0.toLong, t.f1) }
- }
-
- /**
- * Method that assigns a unique id to all the elements of the input data set.
- *
- * @return a data set of tuple 2 consisting of ids and initial values.
- */
- def zipWithUniqueId(implicit ti: TypeInformation[(Long, T)],
- ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
- wrap(jutils.DataSetUtils.zipWithUniqueId(self.javaSet))
- .map { t => (t.f0.toLong, t.f1) }
- }
-
- // --------------------------------------------------------------------------------------------
- // Sample
- // --------------------------------------------------------------------------------------------
- /**
- * Generate a sample of DataSet by the probability fraction of each element.
- *
- * @param withReplacement Whether element can be selected more than once.
- * @param fraction Probability that each element is chosen, should be [0,1] without
- * replacement, and [0, ∞) with replacement. While fraction is larger
- * than 1, the elements are expected to be selected multi times into
- * sample on average.
- * @param seed Random number generator seed.
- * @return The sampled DataSet
- */
- def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong())
- (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
- wrap(jutils.DataSetUtils.sample(self.javaSet, withReplacement, fraction, seed))
- }
-
- /**
- * Generate a sample of DataSet with fixed sample size.
- * <p>
- * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
- * use sample with fraction unless you need exact precision.
- * <p/>
- *
- * @param withReplacement Whether element can be selected more than once.
- * @param numSample The expected sample size.
- * @param seed Random number generator seed.
- * @return The sampled DataSet
- */
- def sampleWithSize(withReplacement: Boolean, numSample: Int, seed: Long = Utils.RNG.nextLong())
- (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
- wrap(jutils.DataSetUtils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
- }
-}
-
-object DataSetUtils {
-
- /**
- * Tie the new class to an existing Scala API class: DataSet.
- */
- implicit def utilsToDataSet[T: TypeInformation: ClassTag](dataSet: DataSet[T]) =
- new DataSetUtils[T](dataSet)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index 98c8c31..91f8c85 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -18,12 +18,10 @@
package org.apache.flink.api.scala
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -63,31 +61,12 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
// Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class.
val leftArrayType =
ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType)
+ .asInstanceOf[TypeInformation[Array[L]]]
val rightArrayType =
ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType)
+ .asInstanceOf[TypeInformation[Array[R]]]
- val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
- classOf[(Array[L], Array[R])],
- Array(leftArrayType, rightArrayType),
- Seq(leftArrayType, rightArrayType),
- Array("_1", "_2")) {
-
- override def createSerializer(
- executionConfig: ExecutionConfig): TypeSerializer[(Array[L], Array[R])] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer(executionConfig)
- }
-
- new CaseClassSerializer[(Array[L], Array[R])](
- classOf[(Array[L], Array[R])],
- fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]])
- }
- }
- }
- }
+ val returnType = createTuple2TypeInformation[Array[L], Array[R]](leftArrayType, rightArrayType)
val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
null, // partitioner
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 0ed74f4..ecc1aab 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,15 +17,12 @@
*/
package org.apache.flink.api.scala
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
import org.apache.flink.api.java.operators._
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -232,25 +229,7 @@ class UnfinishedJoinOperation[L, R](
out.collect((left, right))
}
}
- val returnType = new CaseClassTypeInfo[(L, R)](
- classOf[(L, R)],
- Array(leftSet.getType, rightSet.getType),
- Seq(leftSet.getType, rightSet.getType),
- Array("_1", "_2")) {
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer(executionConfig)
- }
-
- new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
- }
- }
- }
- }
+ val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
val joinOperator = new EquiJoin[L, R, (L, R)](
leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
getCallLocationName())
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index a0c0b58..db9c68c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -18,11 +18,14 @@
package org.apache.flink.api
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
+import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils}
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
/**
* The Flink Scala API. [[org.apache.flink.api.scala.ExecutionEnvironment]] is the starting-point
@@ -70,4 +73,28 @@ package object scala {
}
st(depth).toString
}
+
+ def createTuple2TypeInformation[T1, T2](
+ t1: TypeInformation[T1],
+ t2: TypeInformation[T2])
+ : TypeInformation[(T1, T2)] =
+ new CaseClassTypeInfo[(T1, T2)](
+ classOf[(T1, T2)],
+ Array(t1, t2),
+ Seq(t1, t2),
+ Array("_1", "_2")) {
+
+ override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(T1, T2)] = {
+ val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+ for (i <- 0 until getArity) {
+ fieldSerializers(i) = types(i).createSerializer(executionConfig)
+ }
+
+ new CaseClassSerializer[(T1, T2)](classOf[(T1, T2)], fieldSerializers) {
+ override def createInstance(fields: Array[AnyRef]) = {
+ (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2])
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
new file mode 100644
index 0000000..0d0f6e2
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.reflect.ClassTag
+
+package object utils {
+
+ /**
+ * This class provides simple utility methods for zipping elements in a data set with an index
+ * or with a unique identifier, sampling elements from a data set.
+ *
+ * @param self Data Set
+ */
+ implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
+
+ /**
+ * Method that takes a set of subtask index, total number of elements mappings
+ * and assigns ids to all the elements from the input data set.
+ *
+ * @return a data set of tuple 2 consisting of consecutive ids and initial values.
+ */
+ def zipWithIndex: DataSet[(Long, T)] = {
+ implicit val typeInfo = createTuple2TypeInformation[Long, T](
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ implicitly[TypeInformation[T]]
+ )
+ wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+ }
+
+ /**
+ * Method that assigns a unique id to all the elements of the input data set.
+ *
+ * @return a data set of tuple 2 consisting of ids and initial values.
+ */
+ def zipWithUniqueId: DataSet[(Long, T)] = {
+ implicit val typeInfo = createTuple2TypeInformation[Long, T](
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ implicitly[TypeInformation[T]]
+ )
+ wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Sample
+ // --------------------------------------------------------------------------------------------
+ /**
+ * Generate a sample of DataSet by the probability fraction of each element.
+ *
+ * @param withReplacement Whether element can be selected more than once.
+ * @param fraction Probability that each element is chosen, should be [0,1] without
+ * replacement, and [0, ∞) with replacement. While fraction is larger
+ * than 1, the elements are expected to be selected multi times into
+ * sample on average.
+ * @param seed Random number generator seed.
+ * @return The sampled DataSet
+ */
+ def sample(
+ withReplacement: Boolean,
+ fraction: Double,
+ seed: Long = Utils.RNG.nextLong())
+ : DataSet[T] = {
+ wrap(jutils.sample(self.javaSet, withReplacement, fraction, seed))
+ }
+
+ /**
+ * Generate a sample of DataSet with fixed sample size.
+ * <p>
+ * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
+ * use sample with fraction unless you need exact precision.
+ * <p/>
+ *
+ * @param withReplacement Whether element can be selected more than once.
+ * @param numSample The expected sample size.
+ * @param seed Random number generator seed.
+ * @return The sampled DataSet
+ */
+ def sampleWithSize(
+ withReplacement: Boolean,
+ numSample: Int,
+ seed: Long = Utils.RNG.nextLong())
+ : DataSet[T] = {
+ wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index 0e01eee..0060a9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -20,13 +20,10 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.CrossFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
import org.apache.flink.streaming.api.operators.co.CoStreamWindow
@@ -40,26 +37,8 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend
val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
(l: I1, r: I2) => (l, r))
- val returnType = new CaseClassTypeInfo[(I1, I2)](
- classOf[(I1, I2)],
- Array(input1.getType, input2.getType),
- Seq(input1.getType, input2.getType),
- Array("_1", "_2")) {
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer(executionConfig)
- }
-
- new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
- }
- }
- }
- }
+ val returnType = createTuple2TypeInformation[I1, I2](input1.getType, input2.getType)
val javaStream = input1.connect(input2).addGeneralWindowCombine(
crossWindowFunction,
returnType, windowSize,
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index e872851..e2be44f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -20,17 +20,13 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
import org.apache.flink.streaming.util.keys.KeySelectorUtil
import scala.Array.canBuildFrom
@@ -155,27 +151,7 @@ object StreamJoinOperator {
private def createJoinOperator(): JavaStream[(I1, I2)] = {
- val returnType = new CaseClassTypeInfo[(I1, I2)](
- classOf[(I1, I2)],
- Array(op.input1.getType, op.input2.getType),
- Seq(op.input1.getType, op.input2.getType),
- Array("_1", "_2")) {
-
- override def createSerializer(
- executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
- val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = types(i).createSerializer(executionConfig)
- }
-
- new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
- override def createInstance(fields: Array[AnyRef]) = {
- (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
- }
- }
- }
- }
-
+ val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
.addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 4c7be5e..2eb4f9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api
import _root_.scala.reflect.ClassTag
import language.experimental.macros
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaS
import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
import language.implicitConversions
-import org.apache.flink.streaming.api.windowing.StreamWindow
package object scala {
// We have this here so that we always have generated TypeInformationS when
@@ -72,4 +72,10 @@ package object scala {
"supported on Case Classes (for now).")
}
}
+
+ def createTuple2TypeInformation[T1, T2](
+ t1: TypeInformation[T1],
+ t2: TypeInformation[T2])
+ : TypeInformation[(T1, T2)] =
+ apiTupleCreator[T1, T2](t1, t2)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
index 86b0818..1b62824 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
@@ -21,14 +21,14 @@ import java.util.{List => JavaList, Random}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.utils._
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index d973908..7fff8ff 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -19,11 +19,11 @@
package org.apache.flink.api.scala.util
import org.apache.flink.api.scala._
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit._
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
@RunWith(classOf[Parameterized])
class DataSetUtilsITCase (