You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/11/05 13:52:15 UTC

[3/3] git commit: [FLINK-1191] Add support for Scala Collections and Special Types

[FLINK-1191] Add support for Scala Collections and Special Types

"The special types" are Option and Either. This should work for all
Scala collections except SortedSet and SortedMap, for which the type
checker prints an error message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bd66a08e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bd66a08e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bd66a08e

Branch: refs/heads/master
Commit: bd66a08ed6a71dd0573b7cf467a7b2b6d20ab148
Parents: 1e1df6d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 4 14:59:57 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 5 12:34:57 2014 +0100

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/LICENSE           |   1 +
 .../flink/api/scala/codegen/TypeAnalyzer.scala  | 144 +++++++++----
 .../api/scala/codegen/TypeDescriptors.scala     |  40 +++-
 .../api/scala/codegen/TypeInformationGen.scala  |  86 +++++++-
 .../api/scala/typeutils/EitherSerializer.scala  |  96 +++++++++
 .../api/scala/typeutils/EitherTypeInfo.scala    |  49 +++++
 .../api/scala/typeutils/NothingSerializer.scala |  61 ++++++
 .../api/scala/typeutils/OptionSerializer.scala  |  81 ++++++++
 .../api/scala/typeutils/OptionTypeInfo.scala    |  46 ++++
 .../scala/typeutils/TraversableSerializer.scala | 135 ++++++++++++
 .../scala/typeutils/TraversableTypeInfo.scala   |  43 ++++
 .../scala/runtime/ScalaSpecialTypesITCase.scala | 177 ++++++++++++++++
 .../ScalaSpecialTypesSerializerTest.scala       | 131 ++++++++++++
 .../runtime/TraversableSerializerTest.scala     | 208 +++++++++++++++++++
 14 files changed, 1247 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index 39fe8bc..9ba509c 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -297,6 +297,7 @@ BSD-style licenses:
  - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler Reflect (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
+ - Scala Quasiquotes (BSD-like) - (http://scalamacros.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - ASM (BSD-like) - (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index 493386e..64b9543 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -17,8 +17,7 @@
  */
 package org.apache.flink.api.scala.codegen
 
-import scala.collection.GenTraversableOnce
-import scala.collection.mutable
+import scala.collection._
 import scala.reflect.macros.Context
 import scala.util.DynamicVariable
 
@@ -56,30 +55,71 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
       cache.getOrElseUpdate(tpe) { id =>
         tpe match {
           case PrimitiveType(default, wrapper) => PrimitiveDescriptor(id, tpe, default, wrapper)
+
           case BoxedPrimitiveType(default, wrapper, box, unbox) =>
             BoxedPrimitiveDescriptor(id, tpe, default, wrapper, box, unbox)
-          case ListType(elemTpe, iter) =>
-            analyzeList(id, tpe, elemTpe, iter)
+
+          case ArrayType(elemTpe) => analyzeArray(id, tpe, elemTpe)
+
+          case NothingType() => NothingDesciptor(id, tpe)
+
+          case TraversableType(elemTpe) => analyzeTraversable(id, tpe, elemTpe)
+
+          case EitherType(leftTpe, rightTpe) => analyzeEither(id, tpe, leftTpe, rightTpe)
+
+          case OptionType(elemTpe) => analyzeOption(id, tpe, elemTpe)
+
+          case CaseClassType() => analyzeCaseClass(id, tpe)
+
           case ValueType() => ValueDescriptor(id, tpe)
+
           case WritableType() => WritableDescriptor(id, tpe)
-          case CaseClassType() => analyzeCaseClass(id, tpe)
+
           case JavaType() =>
             // It's a Java Class, let the TypeExtractor deal with it...
             c.warning(c.enclosingPosition, s"Type $tpe is a java class. Will be analyzed by " +
               s"TypeExtractor at runtime.")
             GenericClassDescriptor(id, tpe)
+
           case _ => analyzePojo(id, tpe)
         }
       }
     }
 
-    private def analyzeList(
+    private def analyzeArray(
         id: Int,
         tpe: Type,
-        elemTpe: Type,
-        iter: Tree => Tree): UDTDescriptor = analyze(elemTpe) match {
+        elemTpe: Type): UDTDescriptor = analyze(elemTpe) match {
       case UnsupportedDescriptor(_, _, errs) => UnsupportedDescriptor(id, tpe, errs)
-      case desc => ListDescriptor(id, tpe, iter, desc)
+      case desc => ArrayDescriptor(id, tpe, desc)
+    }
+
+    private def analyzeTraversable(
+        id: Int,
+        tpe: Type,
+        elemTpe: Type): UDTDescriptor = analyze(elemTpe) match {
+      case UnsupportedDescriptor(_, _, errs) => UnsupportedDescriptor(id, tpe, errs)
+      case desc => TraversableDescriptor(id, tpe, desc)
+    }
+
+    private def analyzeEither(
+        id: Int,
+        tpe: Type,
+        leftTpe: Type,
+        rightTpe: Type): UDTDescriptor = analyze(leftTpe) match {
+      case UnsupportedDescriptor(_, _, errs) => UnsupportedDescriptor(id, tpe, errs)
+      case leftDesc => analyze(rightTpe) match {
+        case UnsupportedDescriptor(_, _, errs) => UnsupportedDescriptor(id, tpe, errs)
+        case rightDesc => EitherDescriptor(id, tpe, leftDesc, rightDesc)
+      }
+    }
+
+    private def analyzeOption(
+        id: Int,
+        tpe: Type,
+        elemTpe: Type): UDTDescriptor = analyze(elemTpe) match {
+      case UnsupportedDescriptor(_, _, errs) => UnsupportedDescriptor(id, tpe, errs)
+      case elemDesc => OptionDescriptor(id, tpe, elemDesc)
     }
 
     private def analyzePojo(id: Int, tpe: Type): UDTDescriptor = {
@@ -99,7 +139,7 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
         .filterNot { _.annotations.exists( _.tpe <:< typeOf[scala.transient]) }
 
       if (fields.isEmpty) {
-        c.warning(c.enclosingPosition, "Type $tpe has no fields that are visible from Scala Type" +
+        c.warning(c.enclosingPosition, s"Type $tpe has no fields that are visible from Scala Type" +
           " analysis. Falling back to Java Type Analysis (TypeExtractor).")
         return GenericClassDescriptor(id, tpe)
       }
@@ -210,48 +250,72 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
         boxedPrimitives.get(tpe.typeSymbol)
     }
 
-    private object ListType {
+    private object ArrayType {
+      def unapply(tpe: Type): Option[Type] = tpe match {
+        case TypeRef(_, _, elemTpe :: Nil) if tpe <:< typeOf[Array[_]] => Some(elemTpe)
+        case _ => None
+      }
+    }
 
-      def unapply(tpe: Type): Option[(Type, Tree => Tree)] = tpe match {
 
-        case ArrayType(elemTpe) =>
-          val iter = { source: Tree => 
-            Select(source, newTermName("iterator"))
-          }
-          Some(elemTpe, iter)
+    private object TraversableType {
+      def unapply(tpe: Type): Option[Type] = tpe match {
+        case _ if tpe <:< typeOf[BitSet] => Some(typeOf[Int])
 
-        case TraversableType(elemTpe) =>
-          val iter = { source: Tree => Select(source, newTermName("toIterator")) }
-          Some(elemTpe, iter)
+        case _ if tpe <:< typeOf[SortedMap[_, _]] => None
+        case _ if tpe <:< typeOf[SortedSet[_]] => None
+
+        case _ if tpe <:< typeOf[TraversableOnce[_]] =>
+//          val traversable = tpe.baseClasses
+//            .map(tpe.baseType)
+//            .find(t => t.erasure =:= typeOf[TraversableOnce[_]].erasure)
+
+          val traversable = tpe.baseType(typeOf[TraversableOnce[_]].typeSymbol)
+
+          traversable match {
+            case TypeRef(_, _, elemTpe :: Nil) =>
+              Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
+            case _ => None
+          }
 
         case _ => None
       }
+    }
 
-      private object ArrayType {
-        def unapply(tpe: Type): Option[Type] = tpe match {
-          case TypeRef(_, _, elemTpe :: Nil) if tpe <:< typeOf[Array[_]] => Some(elemTpe)
-          case _ => None
-        }
-      }
+    private object CaseClassType {
+      def unapply(tpe: Type): Boolean = tpe.typeSymbol.asClass.isCaseClass
+    }
+
+    private object NothingType {
+      def unapply(tpe: Type): Boolean = tpe =:= typeOf[Nothing]
+    }
 
-      private object TraversableType {
-        def unapply(tpe: Type): Option[Type] = tpe match {
-          case _ if tpe <:< typeOf[GenTraversableOnce[_]] =>
-            // val abstrElemTpe = genTraversableOnceClass.typeConstructor.typeParams.head.tpe
-            // val elemTpe = abstrElemTpe.asSeenFrom(tpe, genTraversableOnceClass)
-            // Some(elemTpe)
-            // TODO make sure this works as it should
-            tpe match {
-              case TypeRef(_, _, elemTpe :: Nil) => Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
-            }
-
-          case _ => None
+    private object EitherType {
+      def unapply(tpe: Type): Option[(Type, Type)] = {
+        if (tpe <:< typeOf[Either[_, _]]) {
+          val either = tpe.baseType(typeOf[Either[_, _]].typeSymbol)
+          either match {
+            case TypeRef(_, _, leftTpe :: rightTpe :: Nil) =>
+              Some(leftTpe, rightTpe)
+          }
+        } else {
+          None
         }
       }
     }
 
-    private object CaseClassType {
-      def unapply(tpe: Type): Boolean = tpe.typeSymbol.asClass.isCaseClass
+    private object OptionType {
+      def unapply(tpe: Type): Option[Type] = {
+        if (tpe <:< typeOf[Option[_]]) {
+          val option = tpe.baseType(typeOf[Option[_]].typeSymbol)
+          option match {
+            case TypeRef(_, _, elemTpe :: Nil) =>
+              Some(elemTpe)
+          }
+        } else {
+          None
+        }
+      }
     }
 
     private object ValueType {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
index 66299c7..f96cde5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
@@ -80,6 +80,27 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C]
     override def canBeKey = wrapper <:< typeOf[org.apache.flink.types.Key[_]]
   }
 
+  case class NothingDesciptor(id: Int, tpe: Type)
+    extends UDTDescriptor {
+    override val isPrimitiveProduct = false
+    override def flatten = Seq(this)
+    override def canBeKey = false
+  }
+
+  case class EitherDescriptor(id: Int, tpe: Type, left: UDTDescriptor, right: UDTDescriptor)
+    extends UDTDescriptor {
+    override val isPrimitiveProduct = false
+    override def flatten = Seq(this)
+    override def canBeKey = false
+  }
+
+  case class OptionDescriptor(id: Int, tpe: Type, elem: UDTDescriptor)
+    extends UDTDescriptor {
+    override val isPrimitiveProduct = false
+    override def flatten = Seq(this)
+    override def canBeKey = false
+  }
+
   case class BoxedPrimitiveDescriptor(
       id: Int, tpe: Type, default: Literal, wrapper: Type, box: Tree => Tree, unbox: Tree => Tree)
     extends UDTDescriptor {
@@ -96,19 +117,32 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C]
     }
   }
 
-  case class ListDescriptor(id: Int, tpe: Type, iter: Tree => Tree, elem: UDTDescriptor)
+  case class ArrayDescriptor(id: Int, tpe: Type, elem: UDTDescriptor)
+    extends UDTDescriptor {
+    override def canBeKey = false
+    override def flatten = this +: elem.flatten
+
+    override def hashCode() = (id, tpe, elem).hashCode()
+    override def equals(that: Any) = that match {
+      case that @ ArrayDescriptor(thatId, thatTpe, thatElem) =>
+        (id, tpe, elem).equals((thatId, thatTpe, thatElem))
+      case _ => false
+    }
+  }
+
+  case class TraversableDescriptor(id: Int, tpe: Type, elem: UDTDescriptor)
     extends UDTDescriptor {
     override def canBeKey = false
     override def flatten = this +: elem.flatten
 
     def getInnermostElem: UDTDescriptor = elem match {
-      case list: ListDescriptor => list.getInnermostElem
+      case list: TraversableDescriptor => list.getInnermostElem
       case _                    => elem
     }
 
     override def hashCode() = (id, tpe, elem).hashCode()
     override def equals(that: Any) = that match {
-      case that @ ListDescriptor(thatId, thatTpe,  _, thatElem) =>
+      case that @ TraversableDescriptor(thatId, thatTpe, thatElem) =>
         (id, tpe, elem).equals((thatId, thatTpe, thatElem))
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 187ec7d..784c5fd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -19,15 +19,12 @@ package org.apache.flink.api.scala.codegen
 
 import java.lang.reflect.{Field, Modifier}
 
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo._
 
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.types.Value
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.hadoop.io.Writable
 
 import scala.collection.JavaConverters._
@@ -54,22 +51,36 @@ private[flink] trait TypeInformationGen[C <: Context] {
   // TypeInformation from a tree of UDTDescriptor
   def mkTypeInfo[T: c.WeakTypeTag](desc: UDTDescriptor): c.Expr[TypeInformation[T]] = desc match {
     case cc@CaseClassDescriptor(_, tpe, _, _, _) =>
-      mkTupleTypeInfo(cc)(c.WeakTypeTag(tpe).asInstanceOf[c.WeakTypeTag[Product]])
+      mkCaseClassTypeInfo(cc)(c.WeakTypeTag(tpe).asInstanceOf[c.WeakTypeTag[Product]])
         .asInstanceOf[c.Expr[TypeInformation[T]]]
+
     case p : PrimitiveDescriptor => mkPrimitiveTypeInfo(p.tpe)
     case p : BoxedPrimitiveDescriptor => mkPrimitiveTypeInfo(p.tpe)
-    case l : ListDescriptor if l.tpe <:< typeOf[Array[_]] => mkListTypeInfo(l)
+
+    case n: NothingDesciptor => reify { null.asInstanceOf[TypeInformation[T]] }
+
+    case e: EitherDescriptor => mkEitherTypeInfo(e)
+
+    case o: OptionDescriptor => mkOptionTypeInfo(o)
+
+    case a : ArrayDescriptor => mkArrayTypeInfo(a)
+
+    case l : TraversableDescriptor => mkTraversableTypeInfo(l)
+
     case v : ValueDescriptor =>
       mkValueTypeInfo(v)(c.WeakTypeTag(v.tpe).asInstanceOf[c.WeakTypeTag[Value]])
         .asInstanceOf[c.Expr[TypeInformation[T]]]
+
     case d : WritableDescriptor =>
       mkWritableTypeInfo(d)(c.WeakTypeTag(d.tpe).asInstanceOf[c.WeakTypeTag[Writable]])
         .asInstanceOf[c.Expr[TypeInformation[T]]]
+
     case pojo: PojoDescriptor => mkPojo(pojo)
+
     case d => mkGenericTypeInfo(d)
   }
 
-  def mkTupleTypeInfo[T <: Product : c.WeakTypeTag](
+  def mkCaseClassTypeInfo[T <: Product : c.WeakTypeTag](
       desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = {
     val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
     val fields = desc.getters.toList map { field =>
@@ -98,10 +109,69 @@ private[flink] trait TypeInformationGen[C <: Context] {
     }
   }
 
-  def mkListTypeInfo[T: c.WeakTypeTag](desc: ListDescriptor): c.Expr[TypeInformation[T]] = {
+  def mkEitherTypeInfo[T: c.WeakTypeTag](desc: EitherDescriptor): c.Expr[TypeInformation[T]] = {
+
+    val eitherClass = c.Expr[Class[T]](Literal(Constant(weakTypeOf[T])))
+    val leftTypeInfo = mkTypeInfo(desc.left)(c.WeakTypeTag(desc.left.tpe))
+    val rightTypeInfo = mkTypeInfo(desc.right)(c.WeakTypeTag(desc.right.tpe))
+
+    val result = q"""
+      import org.apache.flink.api.scala.typeutils.EitherTypeInfo
+
+      new EitherTypeInfo[${desc.left.tpe}, ${desc.right.tpe}, ${desc.tpe}](
+        $eitherClass,
+        $leftTypeInfo,
+        $rightTypeInfo)
+    """
+
+    c.Expr[TypeInformation[T]](result)
+  }
+
+  def mkOptionTypeInfo[T: c.WeakTypeTag](desc: OptionDescriptor): c.Expr[TypeInformation[T]] = {
+
+    val elemTypeInfo = mkTypeInfo(desc.elem)(c.WeakTypeTag(desc.elem.tpe))
+
+    val result = q"""
+      import org.apache.flink.api.scala.typeutils.OptionTypeInfo
+
+      new OptionTypeInfo[${desc.elem.tpe}, ${desc.tpe}]($elemTypeInfo)
+    """
+
+    c.Expr[TypeInformation[T]](result)
+  }
+
+  def mkTraversableTypeInfo[T: c.WeakTypeTag](
+      desc: TraversableDescriptor): c.Expr[TypeInformation[T]] = {
+    val collectionClass = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
+    val elementClazz = c.Expr[Class[T]](Literal(Constant(desc.elem.tpe)))
+    val elementTypeInfo = mkTypeInfo(desc.elem)(c.WeakTypeTag(desc.elem.tpe))
+
+    val cbf = q"implicitly[CanBuildFrom[${desc.tpe}, ${desc.elem.tpe}, ${desc.tpe}]]"
+
+    val result = q"""
+      import scala.collection.generic.CanBuildFrom
+      import org.apache.flink.api.scala.typeutils.TraversableTypeInfo
+      import org.apache.flink.api.scala.typeutils.TraversableSerializer
+
+      val elementTpe = $elementTypeInfo
+      new TraversableTypeInfo($collectionClass, elementTpe) {
+        def createSerializer() = {
+          new TraversableSerializer[${desc.tpe}, ${desc.elem.tpe}](
+              elementTpe.createSerializer) {
+            def getCbf = implicitly[CanBuildFrom[${desc.tpe}, ${desc.elem.tpe}, ${desc.tpe}]]
+          }
+        }
+      }
+    """
+
+    c.Expr[TypeInformation[T]](result)
+  }
+
+  def mkArrayTypeInfo[T: c.WeakTypeTag](desc: ArrayDescriptor): c.Expr[TypeInformation[T]] = {
     val arrayClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
     val elementClazz = c.Expr[Class[T]](Literal(Constant(desc.elem.tpe)))
     val elementTypeInfo = mkTypeInfo(desc.elem)(c.WeakTypeTag(desc.elem.tpe))
+
     desc.elem match {
       // special case for string, which in scala is a primitive, but not in java
       case p: PrimitiveDescriptor if p.tpe <:< typeOf[String] =>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
new file mode 100644
index 0000000..d28e9dd
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+/**
+ * Serializer for [[Either]].
+ */
+class EitherSerializer[A, B, T <: Either[A, B]](
+    val leftSerializer: TypeSerializer[A],
+    val rightSerializer: TypeSerializer[B])
+  extends TypeSerializer[T] {
+
+  override def isStateful: Boolean = false
+
+  override def createInstance: T = {
+    Left(null).asInstanceOf[T]
+  }
+
+  override def isImmutableType: Boolean = {
+    (leftSerializer == null || leftSerializer.isImmutableType) &&
+      (rightSerializer == null || rightSerializer.isImmutableType)
+  }
+
+  override def getLength: Int = -1
+
+  override def copy(from: T): T = from match {
+    case Left(a: A) => Left(leftSerializer.copy(a)).asInstanceOf[T]
+    case Right(b: B) => Right(rightSerializer.copy(b)).asInstanceOf[T]
+  }
+
+  override def copy(from: T, reuse: T): T = copy(from)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val isLeft = source.readBoolean()
+    target.writeBoolean(isLeft)
+    if (isLeft) {
+      leftSerializer.copy(source, target)
+    } else {
+      rightSerializer.copy(source, target)
+    }
+  }
+
+  override def serialize(either: T, target: DataOutputView): Unit = either match {
+    case Left(a: A) =>
+      target.writeBoolean(true)
+      leftSerializer.serialize(a, target)
+    case Right(b: B) =>
+      target.writeBoolean(false)
+      rightSerializer.serialize(b, target)
+  }
+
+  override def deserialize(source: DataInputView): T = {
+    val isLeft = source.readBoolean()
+    if (isLeft) {
+      Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+    } else {
+      Right(rightSerializer.deserialize(source)).asInstanceOf[T]
+    }
+  }
+
+  override def deserialize(reuse: T, source: DataInputView): T = {
+    val isLeft = source.readBoolean()
+    if (isLeft) {
+      Left(leftSerializer.deserialize(source)).asInstanceOf[T]
+    } else {
+      Right(rightSerializer.deserialize(source)).asInstanceOf[T]
+    }
+  }
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[EitherSerializer[_, _, _]]) {
+      val other = obj.asInstanceOf[EitherSerializer[_, _, _]]
+      other.leftSerializer.equals(leftSerializer) && other.rightSerializer.equals(rightSerializer)
+    } else {
+      false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
new file mode 100644
index 0000000..19c2f90
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+
+/**
+ * TypeInformation [[Either]].
+ */
+class EitherTypeInfo[A, B, T <: Either[A, B]](
+    clazz: Class[T],
+    leftTypeInfo: TypeInformation[A],
+    rightTypeInfo: TypeInformation[B])
+  extends TypeInformation[T] {
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def isKeyType: Boolean = false
+  override def getTotalFields: Int = 1
+  override def getArity: Int = 1
+  override def getTypeClass = clazz
+
+  def createSerializer(): TypeSerializer[T] = {
+    val leftSerializer =
+      if (leftTypeInfo != null) leftTypeInfo.createSerializer() else new NothingSerializer
+
+    val rightSerializer =
+      if (rightTypeInfo != null) rightTypeInfo.createSerializer() else new NothingSerializer
+    new EitherSerializer(leftSerializer, rightSerializer)
+  }
+
+  override def toString = s"Either[$leftTypeInfo, $rightTypeInfo]"
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
new file mode 100644
index 0000000..f213f45
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+/**
+ * Serializer for cases where not serializer is required but the system still expects one. This
+ * happens for OptionTypeInfo when None ist used, or for Either when one of the sides is
+ * Nothing.
+ */
+class NothingSerializer extends TypeSerializer[Any] {
+
+  override def isStateful: Boolean = false
+
+  override def createInstance: Any = {
+    new Integer(-1)
+  }
+
+  override def isImmutableType: Boolean = true
+
+  override def getLength: Int = -1
+
+  override def copy(from: Any): Any =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def copy(from: Any, reuse: Any): Any = copy(from)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def serialize(any: Any, target: DataOutputView): Unit =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+
+  override def deserialize(source: DataInputView): Any =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def deserialize(reuse: Any, source: DataInputView): Any =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def equals(obj: Any): Boolean = {
+    obj != null && obj.isInstanceOf[NothingSerializer]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
new file mode 100644
index 0000000..7e9e4e5
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+/**
+ * Serializer for [[Option]].
+ */
+class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
+  extends TypeSerializer[Option[A]] {
+
+  override def isStateful: Boolean = false
+
+  override def createInstance: Option[A] = {
+    None
+  }
+
+  override def isImmutableType: Boolean = elemSerializer == null || elemSerializer.isImmutableType
+
+  override def getLength: Int = -1
+
+  override def copy(from: Option[A]): Option[A] = from match {
+    case Some(a) => Some(elemSerializer.copy(a))
+    case None => from
+  }
+
+  override def copy(from: Option[A], reuse: Option[A]): Option[A] = copy(from)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val isSome = source.readBoolean()
+    target.writeBoolean(isSome)
+    if (isSome) {
+      elemSerializer.copy(source, target)
+    }
+  }
+
+  override def serialize(either: Option[A], target: DataOutputView): Unit = either match {
+    case Some(a) =>
+      target.writeBoolean(true)
+      elemSerializer.serialize(a, target)
+    case None =>
+      target.writeBoolean(false)
+  }
+
+  override def deserialize(source: DataInputView): Option[A] = {
+    val isSome = source.readBoolean()
+    if (isSome) {
+      Some(elemSerializer.deserialize(source))
+    } else {
+      None
+    }
+  }
+
+  override def deserialize(reuse: Option[A], source: DataInputView): Option[A] = deserialize(source)
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[OptionSerializer[_]]) {
+      val other = obj.asInstanceOf[OptionSerializer[_]]
+      other.elemSerializer.equals(elemSerializer)
+    } else {
+      false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
new file mode 100644
index 0000000..171db60
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+
+/**
+ * TypeInformation [[Option]].
+ */
+class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
+  extends TypeInformation[T] {
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def isKeyType: Boolean = false
+  override def getTotalFields: Int = 1
+  override def getArity: Int = 1
+  override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
+
+  def createSerializer(): TypeSerializer[T] = {
+    if (elemTypeInfo == null) {
+      // this happens when the type of a DataSet is None
+      new OptionSerializer(new NothingSerializer).asInstanceOf[TypeSerializer[T]]
+    } else {
+      new OptionSerializer(elemTypeInfo.createSerializer()).asInstanceOf[TypeSerializer[T]]
+    }
+  }
+
+  override def toString = s"Option[$elemTypeInfo]"
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
new file mode 100644
index 0000000..40071b7
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import java.io.ObjectInputStream
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+import scala.collection.generic.CanBuildFrom
+;
+
+/**
+ * Serializer for Scala Collections.
+ */
+abstract class TraversableSerializer[T <: TraversableOnce[E], E](
+    val elementSerializer: TypeSerializer[E])
+  extends TypeSerializer[T] {
+
+  def getCbf: CanBuildFrom[T, E, T]
+
+  @transient var cbf: CanBuildFrom[T, E, T] = getCbf
+
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject()
+    cbf = getCbf
+  }
+
+  override def createInstance: T = {
+    cbf().result()
+  }
+
+  override def isImmutableType: Boolean = true
+
+  override def getLength: Int = -1
+
+  override def copy(from: T): T = {
+    val builder = cbf()
+    builder.sizeHint(from.size)
+    from foreach { e => builder += e }
+    builder.result()
+  }
+
+  override def copy(from: T, reuse: T): T = copy(from)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val len = source.readInt()
+    target.writeInt(len)
+
+    var i = 0
+    while (i < len) {
+      val isNonNull = source.readBoolean()
+      target.writeBoolean(isNonNull)
+      if (isNonNull) {
+        elementSerializer.copy(source, target)
+      }
+      i += 1
+    }
+  }
+
+  override def serialize(coll: T, target: DataOutputView): Unit = {
+    val len = coll.size
+    target.writeInt(len)
+    coll foreach { e =>
+      if (e == null) {
+        target.writeBoolean(false)
+      } else {
+        target.writeBoolean(true)
+        elementSerializer.serialize(e, target)
+      }
+    }
+  }
+
+  override def isStateful: Boolean = false
+
+  override def deserialize(source: DataInputView): T = {
+    val len = source.readInt()
+    val builder = cbf()
+
+    var i = 0
+    while (i < len) {
+      val isNonNull = source.readBoolean()
+      if (isNonNull) {
+        builder += elementSerializer.deserialize(source)
+      } else {
+        builder += null.asInstanceOf[E]
+      }
+      i += 1
+    }
+
+    builder.result()
+  }
+
+  override def deserialize(reuse: T, source: DataInputView): T = {
+    val len = source.readInt()
+    val builder = cbf()
+
+    var i = 0
+    while (i < len) {
+      val isNonNull = source.readBoolean()
+      if (isNonNull) {
+        builder += elementSerializer.deserialize(source)
+      } else {
+        builder += null.asInstanceOf[E]
+      }
+      i += 1
+    }
+
+    builder.result()
+  }
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[TraversableSerializer[_, _]]) {
+      val other = obj.asInstanceOf[TraversableSerializer[_, _]]
+      other.elementSerializer.equals(elementSerializer)
+    } else {
+      false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
new file mode 100644
index 0000000..6ff8c6e
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+
+import scala.collection.generic.CanBuildFrom
+
+/**
+ * TypeInformation for Scala Collections.
+ */
+abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
+    clazz: Class[T],
+    elementTypeInfo: TypeInformation[E])
+  extends TypeInformation[T] {
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def isKeyType: Boolean = false
+  override def getTotalFields: Int = 1
+  override def getArity: Int = 1
+  override def getTypeClass: Class[T] = clazz
+
+  def createSerializer(): TypeSerializer[T]
+
+  override def toString = s"Collection[$elementTypeInfo]"
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
new file mode 100644
index 0000000..a923af6
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+@RunWith(classOf[Parameterized])
+class ScalaSpecialTypesITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private var curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = curProgId match {
+      case 1 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => Left(10)
+          case 2 => Right(20)
+        })
+
+        val result = eithers.map(_ match {
+          case Left(i) => i
+          case Right(i) => i
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "60"
+
+      case 2 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => Left(10)
+          case 2 => Left(20)
+        })
+
+        val result = eithers.map(_ match {
+          case Left(i) => i
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "60"
+
+      case 3 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => Right(10)
+          case 2 => Right(20)
+        })
+
+        val result = eithers.map(_ match {
+          case Right(i) => i
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "60"
+
+      case 4 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => Some(10)
+          case 2 => None
+        })
+
+        val result = eithers.map(_ match {
+          case Some(i) => i
+          case None => 20
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "60"
+
+      case 5 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => Some(10)
+          case 2 => Some(20)
+        })
+
+        val result = eithers.map(_ match {
+          case Some(i) => i
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "60"
+
+      case 6 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val nums = env.fromElements(1, 2, 1, 2)
+
+        val eithers = nums.map(_ match {
+          case 1 => None
+          case 2 => None
+        })
+
+        val result = eithers.map(_ match {
+          case None => 20
+        }).reduce(_ + _).writeAsText(resultPath)
+
+        env.execute()
+
+        "80"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object ScalaSpecialTypesITCase {
+  var NUM_PROGRAMS: Int = 6
+
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to ScalaSpecialTypesITCase.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
new file mode 100644
index 0000000..60651d1
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.junit.Assert._
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.junit.{Assert, Test}
+
+import org.apache.flink.api.scala._
+
+class ScalaSpecialTypesSerializerTest {
+
+  @Test
+  def testOption(): Unit = {
+    val testData = Array(Some("Hello"), Some("Ciao"), None)
+    runTests(testData)
+  }
+
+  @Test
+  def testSome(): Unit = {
+    val testData = Array(Some("Hello"), Some("Ciao"))
+    runTests(testData)
+  }
+
+  @Test
+  def testNone(): Unit = {
+    val testData = Array(None, None)
+    runTests(testData)
+  }
+
+  @Test
+  def testEither(): Unit = {
+    val testData = Array(Left("Hell"), Right(3))
+    runTests(testData)
+  }
+
+  @Test
+  def testLeft(): Unit = {
+    val testData = Array(Left("Hell"), Left("CIao"))
+    runTests(testData)
+  }
+
+  @Test
+  def testRight(): Unit = {
+    val testData = Array(Right("Hell"), Right("CIao"))
+    runTests(testData)
+  }
+
+
+  private final def runTests[T : TypeInformation](instances: Array[T]) {
+    try {
+      val typeInfo = implicitly[TypeInformation[T]]
+      val serializer = typeInfo.createSerializer
+      val typeClass = typeInfo.getTypeClass
+      val test =
+        new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances)
+      test.testAll()
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+}
+
+class ScalaSpecialTypesSerializerTestInstance[T](
+                                                serializer: TypeSerializer[T],
+                                                typeClass: Class[T],
+                                                length: Int,
+                                                testData: Array[T])
+  extends SerializerTestInstance[T](serializer, typeClass, length, testData: _*) {
+
+  @Test
+  override def testInstantiate(): Unit = {
+    try {
+      val serializer: TypeSerializer[T] = getSerializer
+      val instance: T = serializer.createInstance
+      assertNotNull("The created instance must not be null.", instance)
+      val tpe: Class[T] = getTypeClass
+      assertNotNull("The test is corrupt: type class is null.", tpe)
+      // We cannot check this because Collection Instances are not always of the type
+      // that the user writes, they might have generated names.
+      // assertEquals("Type of the instantiated object is wrong.", tpe, instance.getClass)
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  override protected def deepEquals(message: String, should: T, is: T) {
+    should match {
+      case trav: TraversableOnce[_] =>
+        val isTrav = is.asInstanceOf[TraversableOnce[_]]
+        assertEquals(message, trav.size, isTrav.size)
+        val it = trav.toIterator
+        val isIt = isTrav.toIterator
+        while (it.hasNext) {
+          val should = it.next()
+          val is = isIt.next()
+          assertEquals(message, should, is)
+        }
+
+      case _ =>
+        super.deepEquals(message, should, is)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd66a08e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
new file mode 100644
index 0000000..1155d73
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.junit.Assert._
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.junit.{Ignore, Assert, Test}
+
+import org.apache.flink.api.scala._
+
+import scala.collection.immutable.{BitSet, SortedSet, LinearSeq}
+import scala.collection.{SortedMap, mutable}
+
+class TraversableSerializerTest {
+
+  @Test
+  def testSeq(): Unit = {
+    val testData = Array(Seq(1,2,3), Seq(2,3))
+    runTests(testData)
+  }
+
+  @Test
+  def testIndexedSeq(): Unit = {
+    val testData = Array(IndexedSeq(1,2,3), IndexedSeq(2,3))
+    runTests(testData)
+  }
+
+  @Test
+  def testLinearSeq(): Unit = {
+    val testData = Array(LinearSeq(1,2,3), LinearSeq(2,3))
+    runTests(testData)
+  }
+
+  @Test
+  def testMap(): Unit = {
+    val testData = Array(Map("Hello" -> 1, "World" -> 2), Map("Foo" -> 42))
+    runTests(testData)
+  }
+
+  @Test(expected = classOf[InvalidTypesException])
+  def testSortedMap(): Unit = {
+    // SortedSet is not supported right now.
+    val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), SortedMap("Foo" -> 42))
+    runTests(testData)
+  }
+
+  @Test
+  def testSet(): Unit = {
+    val testData = Array(Set(1,2,3,3), Set(2,3))
+    runTests(testData)
+  }
+
+  @Test(expected = classOf[InvalidTypesException])
+  def testSortedSet(): Unit = {
+    // SortedSet is not supported right now.
+    val testData = Array(SortedSet(1,2,3), SortedSet(2,3))
+    runTests(testData)
+  }
+
+  @Test
+  def testBitSet(): Unit = {
+    val testData = Array(BitSet(1,2,3,4), BitSet(2,3,2))
+    runTests(testData)
+  }
+
+  @Test
+  def testMutableList(): Unit = {
+    val testData = Array(mutable.MutableList(1,2,3), mutable.MutableList(2,3,2))
+    runTests(testData)
+  }
+
+  @Test
+  def testStringArray(): Unit = {
+    val testData = Array(Array("Foo", "Bar"), Array("Hello"))
+    runTests(testData)
+  }
+
+  @Test
+  def testIntArray(): Unit = {
+    val testData = Array(Array(1,3,3,7), Array(4,7))
+    runTests(testData)
+  }
+
+  @Test
+  def testArrayWithCaseClass(): Unit = {
+    val testData = Array(Array((1, "String"), (2, "Foo")), Array((4, "String"), (3, "Foo")))
+    runTests(testData)
+  }
+//
+  @Test
+  def testWithCaseClass(): Unit = {
+    val testData = Array(Seq((1, "String"), (2, "Foo")), Seq((4, "String"), (3, "Foo")))
+    runTests(testData)
+  }
+
+  @Test
+  def testWithPojo(): Unit = {
+    val testData = Array(Seq(new Pojo("hey", 1)), Seq(new Pojo("Ciao", 2), new Pojo("Foo", 3)))
+    runTests(testData)
+  }
+
+  @Test
+  @Ignore
+  def testWithMixedPrimitives(): Unit = {
+    // Does not work yet because the GenericTypeInfo used for the elements will
+    // have a typeClass of Object, and therefore not deserializer the elements correctly.
+    // It does work when used in a Job, though. Because the Objects get cast to
+    // the correct type in the user function.
+    val testData = Array(Seq(1,1L,1d,true,"Hello"), Seq(2,2L,2d,false,"Ciao"))
+    runTests(testData)
+  }
+
+
+
+  private final def runTests[T : TypeInformation](instances: Array[T]) {
+    try {
+      val typeInfo = implicitly[TypeInformation[T]]
+      val serializer = typeInfo.createSerializer
+      val typeClass = typeInfo.getTypeClass
+      val test =
+        new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances)
+      test.testAll()
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+}
+
+class Pojo(val name: String, val count: Int) {
+  def this() = this("", -1)
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case oP: Pojo => name == oP.name && count == oP.count
+      case _ => false
+    }
+  }
+}
+
+class ScalaCollectionSerializerTestInstance[T](
+    serializer: TypeSerializer[T],
+    typeClass: Class[T],
+    length: Int,
+    testData: Array[T])
+  extends SerializerTestInstance[T](serializer, typeClass, length, testData: _*) {
+
+  @Test
+  override def testInstantiate(): Unit = {
+    try {
+      val serializer: TypeSerializer[T] = getSerializer
+      val instance: T = serializer.createInstance
+      assertNotNull("The created instance must not be null.", instance)
+      val tpe: Class[T] = getTypeClass
+      assertNotNull("The test is corrupt: type class is null.", tpe)
+      // We cannot check this because Collection Instances are not always of the type
+      // that the user writes, they might have generated names.
+      // assertEquals("Type of the instantiated object is wrong.", tpe, instance.getClass)
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  override protected def deepEquals(message: String, should: T, is: T) {
+    should match {
+      case trav: TraversableOnce[_] =>
+        val isTrav = is.asInstanceOf[TraversableOnce[_]]
+        assertEquals(message, trav.size, isTrav.size)
+        val it = trav.toIterator
+        val isIt = isTrav.toIterator
+        while (it.hasNext) {
+          val should = it.next()
+          val is = isIt.next()
+          assertEquals(message, should, is)
+        }
+
+      case _ =>
+        super.deepEquals(message, should, is)
+    }
+  }
+}
+