You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:49:02 UTC

[4/7] flink git commit: [FLINK-2627] [scala api] Makes scala data set utils easier to access.

[FLINK-2627] [scala api] Makes scala data set utils easier to access.

The import required is now org.apache.flink.api.scala.utils._
Also adds a method to create case class type information for scala tuples

This closes #1099


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c5ebc8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c5ebc8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c5ebc8b

Branch: refs/heads/master
Commit: 0c5ebc8b9361e48c32425f1235a86e5e1bf34976
Parents: 3fe9145
Author: Sachin Goel <sa...@gmail.com>
Authored: Sat Sep 5 18:42:38 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:42:28 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/scala/CrossDataSet.scala   |  24 +----
 .../apache/flink/api/scala/DataSetUtils.scala   | 103 ------------------
 .../api/scala/UnfinishedCoGroupOperation.scala  |  29 +----
 .../apache/flink/api/scala/joinDataSet.scala    |  23 +---
 .../org/apache/flink/api/scala/package.scala    |  33 +++++-
 .../apache/flink/api/scala/utils/package.scala  | 108 +++++++++++++++++++
 .../api/scala/StreamCrossOperator.scala         |  25 +----
 .../api/scala/StreamJoinOperator.scala          |  28 +----
 .../flink/streaming/api/scala/package.scala     |   8 +-
 .../api/scala/operators/SampleITCase.scala      |   4 +-
 .../api/scala/util/DataSetUtilsITCase.scala     |   6 +-
 11 files changed, 160 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
index 19818a0..c9e1540 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
@@ -17,14 +17,10 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{CrossFunction, RichCrossFunction}
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
@@ -111,25 +107,7 @@ private[flink] object CrossDataSet {
         (left, right)
       }
     }
-    val returnType = new CaseClassTypeInfo[(L, R)](
-      classOf[(L, R)],
-      Array(leftInput.getType, rightInput.getType),
-      Seq(leftInput.getType, rightInput.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
     val crossOperator = new CrossOperator[L, R, (L, R)](
       leftInput.javaSet,
       rightInput.javaSet,

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
deleted file mode 100644
index 793b201..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{Utils, utils => jutils}
-
-import _root_.scala.language.implicitConversions
-import _root_.scala.reflect.ClassTag
-
-/**
- * This class provides simple utility methods for zipping elements in a data set with an index
- * or with a unique identifier.
- */
-
-class DataSetUtils[T](val self: DataSet[T]) {
-
-  /**
-   * Method that takes a set of subtask index, total number of elements mappings
-   * and assigns ids to all the elements from the input data set.
-   *
-   * @return a data set of tuple 2 consisting of consecutive ids and initial values.
-   */
-  def zipWithIndex(implicit ti: TypeInformation[(Long, T)],
-                   ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
-    wrap(jutils.DataSetUtils.zipWithIndex(self.javaSet))
-      .map { t => (t.f0.toLong, t.f1) }
-  }
-
-  /**
-   * Method that assigns a unique id to all the elements of the input data set.
-   *
-   * @return a data set of tuple 2 consisting of ids and initial values.
-   */
-  def zipWithUniqueId(implicit ti: TypeInformation[(Long, T)],
-                      ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
-    wrap(jutils.DataSetUtils.zipWithUniqueId(self.javaSet))
-      .map { t => (t.f0.toLong, t.f1) }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Sample
-  // --------------------------------------------------------------------------------------------
-  /**
-   * Generate a sample of DataSet by the probability fraction of each element.
-   *
-   * @param withReplacement Whether element can be selected more than once.
-   * @param fraction        Probability that each element is chosen, should be [0,1] without
-   *                        replacement, and [0, ∞) with replacement. While fraction is larger
-   *                        than 1, the elements are expected to be selected multi times into
-   *                        sample on average.
-   * @param seed            Random number generator seed.
-   * @return The sampled DataSet
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong())
-            (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
-    wrap(jutils.DataSetUtils.sample(self.javaSet, withReplacement, fraction, seed))
-  }
-
-  /**
-   * Generate a sample of DataSet with fixed sample size.
-   * <p>
-   * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
-   * use sample with fraction unless you need exact precision.
-   * <p/>
-   *
-   * @param withReplacement Whether element can be selected more than once.
-   * @param numSample       The expected sample size.
-   * @param seed            Random number generator seed.
-   * @return The sampled DataSet
-   */
-  def sampleWithSize(withReplacement: Boolean, numSample: Int, seed: Long = Utils.RNG.nextLong())
-                    (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
-    wrap(jutils.DataSetUtils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
-  }
-}
-
-object DataSetUtils {
-
-  /**
-   * Tie the new class to an existing Scala API class: DataSet.
-   */
-  implicit def utilsToDataSet[T: TypeInformation: ClassTag](dataSet: DataSet[T]) =
-    new DataSetUtils[T](dataSet)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index 98c8c31..91f8c85 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -63,31 +61,12 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
     // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class.
     val leftArrayType =
       ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType)
+        .asInstanceOf[TypeInformation[Array[L]]]
     val rightArrayType =
       ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType)
+        .asInstanceOf[TypeInformation[Array[R]]]
 
-    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
-      classOf[(Array[L], Array[R])],
-      Array(leftArrayType, rightArrayType),
-      Seq(leftArrayType, rightArrayType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(
-          executionConfig: ExecutionConfig): TypeSerializer[(Array[L], Array[R])] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(Array[L], Array[R])](
-          classOf[(Array[L], Array[R])],
-          fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[Array[L], Array[R]](leftArrayType, rightArrayType)
     val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
       leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
       null, // partitioner

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 0ed74f4..ecc1aab 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
@@ -232,25 +229,7 @@ class UnfinishedJoinOperation[L, R](
         out.collect((left, right))
       }
     }
-    val returnType = new CaseClassTypeInfo[(L, R)](
-      classOf[(L, R)],
-      Array(leftSet.getType, rightSet.getType),
-      Seq(leftSet.getType, rightSet.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
     val joinOperator = new EquiJoin[L, R, (L, R)](
       leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
         getCallLocationName())

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index a0c0b58..db9c68c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -18,11 +18,14 @@
 
 package org.apache.flink.api
 
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils}
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
 
 /**
  * The Flink Scala API. [[org.apache.flink.api.scala.ExecutionEnvironment]] is the starting-point
@@ -70,4 +73,28 @@ package object scala {
     }
     st(depth).toString
   }
+
+  def createTuple2TypeInformation[T1, T2](
+      t1: TypeInformation[T1],
+      t2: TypeInformation[T2])
+    : TypeInformation[(T1, T2)] =
+    new CaseClassTypeInfo[(T1, T2)](
+      classOf[(T1, T2)],
+      Array(t1, t2),
+      Seq(t1, t2),
+      Array("_1", "_2")) {
+
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(T1, T2)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
+        }
+
+        new CaseClassSerializer[(T1, T2)](classOf[(T1, T2)], fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2])
+          }
+        }
+      }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
new file mode 100644
index 0000000..0d0f6e2
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.reflect.ClassTag
+
+package object utils {
+
+  /**
+   * This class provides simple utility methods for zipping elements in a data set with an index
+   * or with a unique identifier, sampling elements from a data set.
+   *
+   * @param self Data Set
+   */
+  implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
+
+    /**
+     * Method that takes a set of subtask index, total number of elements mappings
+     * and assigns ids to all the elements from the input data set.
+     *
+     * @return a data set of tuple 2 consisting of consecutive ids and initial values.
+     */
+    def zipWithIndex: DataSet[(Long, T)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Long, T](
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+        implicitly[TypeInformation[T]]
+      )
+      wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+    }
+
+    /**
+     * Method that assigns a unique id to all the elements of the input data set.
+     *
+     * @return a data set of tuple 2 consisting of ids and initial values.
+     */
+    def zipWithUniqueId: DataSet[(Long, T)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Long, T](
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+        implicitly[TypeInformation[T]]
+      )
+      wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    //  Sample
+    // --------------------------------------------------------------------------------------------
+    /**
+     * Generate a sample of DataSet by the probability fraction of each element.
+     *
+     * @param withReplacement Whether element can be selected more than once.
+     * @param fraction        Probability that each element is chosen, should be [0,1] without
+     *                        replacement, and [0, ∞) with replacement. While fraction is larger
+     *                        than 1, the elements are expected to be selected multi times into
+     *                        sample on average.
+     * @param seed            Random number generator seed.
+     * @return The sampled DataSet
+     */
+    def sample(
+        withReplacement: Boolean,
+        fraction: Double,
+        seed: Long = Utils.RNG.nextLong())
+      : DataSet[T] = {
+      wrap(jutils.sample(self.javaSet, withReplacement, fraction, seed))
+    }
+
+    /**
+     * Generate a sample of DataSet with fixed sample size.
+     * <p>
+     * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
+     * use sample with fraction unless you need exact precision.
+     * <p/>
+     *
+     * @param withReplacement Whether element can be selected more than once.
+     * @param numSample       The expected sample size.
+     * @param seed            Random number generator seed.
+     * @return The sampled DataSet
+     */
+    def sampleWithSize(
+        withReplacement: Boolean,
+        numSample: Int,
+        seed: Long = Utils.RNG.nextLong())
+      : DataSet[T] = {
+      wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index 0e01eee..0060a9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -20,13 +20,10 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
 import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 
@@ -40,26 +37,8 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend
     val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
       (l: I1, r: I2) => (l, r))
 
-    val returnType = new CaseClassTypeInfo[(I1, I2)](
-      classOf[(I1, I2)],
-      Array(input1.getType, input2.getType),
-      Seq(input1.getType, input2.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-          }
-        }
-      }
-    }
 
+    val returnType = createTuple2TypeInformation[I1, I2](input1.getType, input2.getType)
     val javaStream = input1.connect(input2).addGeneralWindowCombine(
       crossWindowFunction,
       returnType, windowSize,

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index e872851..e2be44f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -20,17 +20,13 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 
 import scala.Array.canBuildFrom
@@ -155,27 +151,7 @@ object StreamJoinOperator {
 
     private def createJoinOperator(): JavaStream[(I1, I2)] = {
 
-      val returnType = new CaseClassTypeInfo[(I1, I2)](
-        classOf[(I1, I2)],
-        Array(op.input1.getType, op.input2.getType),
-        Seq(op.input1.getType, op.input2.getType),
-        Array("_1", "_2")) {
-
-        override def createSerializer(
-            executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
-          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-          for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer(executionConfig)
-          }
-
-          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-            override def createInstance(fields: Array[AnyRef]) = {
-              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-            }
-          }
-        }
-      }
-
+      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
       op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
         .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
           returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 4c7be5e..2eb4f9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api
 import _root_.scala.reflect.ClassTag
 import language.experimental.macros
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaS
 import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
 import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
 import language.implicitConversions
-import org.apache.flink.streaming.api.windowing.StreamWindow
 
 package object scala {
   // We have this here so that we always have generated TypeInformationS when
@@ -72,4 +72,10 @@ package object scala {
           "supported on Case Classes (for now).")
     }
   }
+
+  def createTuple2TypeInformation[T1, T2](
+      t1: TypeInformation[T1],
+      t2: TypeInformation[T2])
+    : TypeInformation[(T1, T2)] =
+    apiTupleCreator[T1, T2](t1, t2)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
index 86b0818..1b62824 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
@@ -21,14 +21,14 @@ import java.util.{List => JavaList, Random}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
 
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index d973908..7fff8ff 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -19,11 +19,11 @@
 package org.apache.flink.api.scala.util
 
 import org.apache.flink.api.scala._
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit._
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
 
 @RunWith(classOf[Parameterized])
 class DataSetUtilsITCase (