You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/07/08 12:09:22 UTC

git commit: First cleanup attempt, mostly on Scala code, to follow guidelines from http://docs.scala-lang.org/style: 1. Remove return statement from Scala code where it is not necessary (end of method definition) 2. Remove extra ; from Scala and Java cod

Repository: incubator-flink
Updated Branches:
  refs/heads/master 59287062f -> 3f511953c


First cleanup attempt, mostly on Scala code, to follow guidelines from http://docs.scala-lang.org/style:
1. Remove return statement from Scala code where it is not necessary (end of method definition)
2. Remove extra ; from Scala and Java code
3. First drop to abide to 100 chars per line for Scala code.
   Will send another PR for other files as I have encountered.
4. Remove parentheses for empty argument methods that do not have side effect
   (see http://www.artima.com/pins1ed/composition-and-inheritance.html#i1343251059-1)
5. Remove unused import statements in Scala code as I have encountered them.

This is first drop to refactor and clean up Scala code to see comment/ response from community.
More PRs for follow up code will come.

Close #64


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f511953
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f511953
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f511953

Branch: refs/heads/master
Commit: 3f511953cde583a9735f88a600e50ed8fae029a6
Parents: 5928706
Author: Henry Saputra <hs...@apache.org>
Authored: Tue Jul 8 00:38:46 2014 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jul 8 12:11:25 2014 +0200

----------------------------------------------------------------------
 .../examples/scala/wordcount/WordCount.scala    |   2 +-
 .../src/main/scala/Job.scala                    |   4 +-
 .../runtime/hash/HashMatchIteratorITCase.java   |   4 +-
 .../test/util/types/IntPairSerializer.java      |   2 +-
 .../stratosphere/api/scala/AnnotationUtil.scala |  14 +-
 .../stratosphere/api/scala/CompilerHints.scala  | 145 ++++++++++++-----
 .../eu/stratosphere/api/scala/DataSet.scala     |  26 ++-
 .../eu/stratosphere/api/scala/DataSource.scala  |   2 +-
 .../stratosphere/api/scala/ScalaOperator.scala  |  53 +++---
 .../api/scala/operators/MapOperator.scala       | 162 +++++++++++--------
 .../api/scala/operators/UnionOperator.scala     |  10 +-
 .../test/javaApiOperators/FilterITCase.java     |   2 +-
 12 files changed, 267 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala b/stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala
index 791f19e..5cbcba6 100644
--- a/stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala
+++ b/stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala
@@ -33,7 +33,7 @@ class WordCount extends Program with ProgramDescription with Serializable {
     val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
     val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
 
-    val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "));
+    val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
   
     val plan = new ScalaPlan(Seq(output), "Word Count")
     plan.setDefaultParallelism(numSubTasks)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
index 89a429f..dae5349 100644
--- a/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
+++ b/stratosphere-quickstart/quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -41,8 +41,8 @@ object RunJobRemote {
     // You will also need to change the name of the jar if you change the
     // project name and/or version. Before running this you also need
     // to run "mvn package" to create the jar.
-    val ex = new RemoteExecutor("localhost", 6123, "target/stratosphere-project-0.1-SNAPSHOT.jar");
-    ex.executePlan(plan);
+    val ex = new RemoteExecutor("localhost", 6123, "target/stratosphere-project-0.1-SNAPSHOT.jar")
+    ex.executePlan(plan)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
index e59f4a6..8ca7ebf 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
@@ -426,7 +426,7 @@ public class HashMatchIteratorITCase {
 			
 			while (iterator.callWithNextKey(matcher, collector));
 			
-			iterator.close();;
+			iterator.close();
 	
 			// assert that each expected match was seen
 			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
@@ -473,7 +473,7 @@ public class HashMatchIteratorITCase {
 			
 			while (iterator.callWithNextKey(matcher, collector));
 			
-			iterator.close();;
+			iterator.close();
 	
 			// assert that each expected match was seen
 			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/types/IntPairSerializer.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/types/IntPairSerializer.java
index b4a3137..af11748 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/types/IntPairSerializer.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/types/IntPairSerializer.java
@@ -69,7 +69,7 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 8);;
+		target.write(source, 8);
 	}
 
 	public static final class IntPairSerializerFactory implements TypeSerializerFactory<IntPair> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/AnnotationUtil.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/AnnotationUtil.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/AnnotationUtil.scala
index 53fdcda..abcb659 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/AnnotationUtil.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/AnnotationUtil.scala
@@ -121,7 +121,7 @@ object AnnotationUtil {
 
         val fieldSet = new FieldSet(notConstantSet1Annotation.value(): _*)
 
-        for (i <- 0 until scalaOp.getUDF().getOutputLength) {
+        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
           if (!fieldSet.contains(i)) {
             properties.addForwardedField1(i, i)
           }
@@ -141,7 +141,7 @@ object AnnotationUtil {
 
         val fieldSet = new FieldSet(notConstantSet2Annotation.value(): _*)
 
-        for (i <- 0 until scalaOp.getUDF().getOutputLength) {
+        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
           if (!fieldSet.contains(i)) {
             properties.addForwardedField2(i, i)
           }
@@ -166,10 +166,10 @@ object AnnotationUtil {
       }
 
       // get constantSet annotation from stub
-      val constantSet: FunctionAnnotation.ConstantFields = scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation
-      .ConstantFields])
-      val notConstantSet: FunctionAnnotation.ConstantFieldsExcept = scalaOp.getUserCodeAnnotation(
-        classOf[FunctionAnnotation.ConstantFieldsExcept])
+      val constantSet: FunctionAnnotation.ConstantFields =
+        scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFields])
+      val notConstantSet: FunctionAnnotation.ConstantFieldsExcept =
+        scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFieldsExcept])
 
       if (notConstantSet != null && constantSet != null) {
         throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both.")
@@ -186,7 +186,7 @@ object AnnotationUtil {
           }
         }
 
-        for (i <- 0 until scalaOp.getUDF().getOutputLength) {
+        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
           if (!nonConstant.contains(i)) {
             properties.addForwardedField(i, i)
           }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
index 68afb3c..fd23e03 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
@@ -15,24 +15,30 @@ package eu.stratosphere.api.scala
 
 import language.experimental.macros
 import scala.util.DynamicVariable
+import scala.reflect.macros.Context
 import eu.stratosphere.api.scala.analysis._
-import eu.stratosphere.api.scala.operators.Annotations
 import eu.stratosphere.api.common.operators.util.{ FieldSet => PactFieldSet }
 import eu.stratosphere.api.common.operators.Operator
 import eu.stratosphere.api.scala.codegen.MacroContextHolder
-import scala.reflect.macros.Context
 import eu.stratosphere.types.Record
 
-case class KeyCardinality(key: FieldSelector, isUnique: Boolean, distinctCount: Option[Long], avgNumRecords: Option[Float]) {
+case class KeyCardinality(
+    key: FieldSelector,
+    isUnique: Boolean,
+    distinctCount: Option[Long],
+    avgNumRecords: Option[Float]) {
 
-  @transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
+  @transient private var pactFieldSets =
+    collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
 
   def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
 
-    if (pactFieldSets == null)
-      pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
+    if (pactFieldSets == null) {
+      pactFieldSets =
+        collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
+    }
 
-    val keyCopy = key.copy
+    val keyCopy = key.copy()
     contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
     val keySet = keyCopy.selectedFields.toIndexSet.toArray
 
@@ -57,27 +63,40 @@ trait OutputHintable[Out] { this: DataSet[Out] =>
     
   def outputSize = contract.getCompilerHints().getOutputSize()
   def outputSize_=(value: Long) = contract.getCompilerHints().setOutputSize(value)
-  def outputSize(value: Long): this.type = { contract.getCompilerHints().setOutputSize(value); this }
+  def outputSize(value: Long): this.type = {
+    contract.getCompilerHints().setOutputSize(value)
+    this
+  }
   
   def outputCardinality = contract.getCompilerHints().getOutputCardinality()
   def outputCardinality_=(value: Long) = contract.getCompilerHints().setOutputCardinality(value)
-  def outputCardinality(value: Long): this.type = { contract.getCompilerHints().setOutputCardinality(value); this }
+  def outputCardinality(value: Long): this.type = {
+    contract.getCompilerHints().setOutputCardinality(value)
+    this
+  }
   
   def avgBytesPerRecord = contract.getCompilerHints().getAvgOutputRecordSize()
   def avgBytesPerRecord_=(value: Float) = contract.getCompilerHints().setAvgOutputRecordSize(value)
-  def avgBytesPerRecord(value: Float): this.type = { contract.getCompilerHints().setAvgOutputRecordSize(value); this }
+  def avgBytesPerRecord(value: Float): this.type = {
+    contract.getCompilerHints().setAvgOutputRecordSize(value)
+    this
+  }
 
   def filterFactor = contract.getCompilerHints().getFilterFactor()
   def filterFactor_=(value: Float) = contract.getCompilerHints().setFilterFactor(value)
-  def filterFactor(value: Float): this.type = { contract.getCompilerHints().setFilterFactor(value); this }
+  def filterFactor(value: Float): this.type = {
+    contract.getCompilerHints().setFilterFactor(value)
+    this
+  }
 
   def uniqueKey[Key](fields: Out => Key) = macro OutputHintableMacros.uniqueKey[Out, Key]
 
   def applyHints(contract: Operator[Record] with ScalaOperator[_, _]): Unit = {
     val hints = contract.getCompilerHints
 
-    if (hints.getUniqueFields != null)
+    if (hints.getUniqueFields != null) {
       hints.getUniqueFields.clear()
+    }
 
     _cardinalities.foreach { card =>
 
@@ -92,7 +111,10 @@ trait OutputHintable[Out] { this: DataSet[Out] =>
 
 object OutputHintableMacros {
   
-  def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key]): c.Expr[Unit] = {
+  def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -109,10 +131,14 @@ object OutputHintableMacros {
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
   
-  def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long]): c.Expr[Unit] = {
+  def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })
+      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -129,10 +155,13 @@ object OutputHintableMacros {
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
   
-  def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key]): c.Expr[Unit] = {
+  def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -149,10 +178,14 @@ object OutputHintableMacros {
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
   
-  def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long]): c.Expr[Unit] = {
+  def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })
+      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -169,10 +202,12 @@ object OutputHintableMacros {
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
   
-  def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
+  def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })
+      (fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -189,10 +224,14 @@ object OutputHintableMacros {
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
   
-  def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
+  def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
+      (c: Context { type PrefixType = OutputHintable[Out] })
+      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -201,15 +240,14 @@ object OutputHintableMacros {
 
     val result = reify {
       val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
       
       val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
+      val key = new FieldSelector(contract.getUDF.outputUDT, keySelection)
       val card = KeyCardinality(key, false, Some(distinctCount.splice), Some(avgNumRecords.splice))
       
       c.prefix.splice.addCardinality(card)
     }
-    return result
+    result
   }
 }
 
@@ -220,9 +258,12 @@ trait InputHintable[In, Out] { this: DataSet[Out] =>
   def getInputUDT: UDT[In]
   def getOutputUDT: UDT[Out]
 
-  def neglects[Fields](fields: In => Fields): Unit = macro InputHintableMacros.neglects[In, Out, Fields]
-  def observes[Fields](fields: In => Fields): Unit = macro InputHintableMacros.observes[In, Out, Fields]
-  def preserves[Fields](from: In => Fields, to: Out => Fields) = macro InputHintableMacros.preserves[In, Out, Fields]
+  def neglects[Fields](fields: In => Fields): Unit =
+    macro InputHintableMacros.neglects[In, Out, Fields]
+  def observes[Fields](fields: In => Fields): Unit =
+    macro InputHintableMacros.observes[In, Out, Fields]
+  def preserves[Fields](from: In => Fields, to: Out => Fields) =
+    macro InputHintableMacros.preserves[In, Out, Fields]
 }
 
 object InputHintable {
@@ -235,7 +276,11 @@ object InputHintable {
 
 object InputHintableMacros {
   
-  def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(fields: c.Expr[In => Fields]): c.Expr[Unit] = {
+  def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
+      (c: Context { type PrefixType = InputHintable[In, Out] })
+      (fields: c.Expr[In => Fields])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -248,10 +293,14 @@ object InputHintableMacros {
       val unreadFields = fieldSelector.selectedFields.map(_.localPos).toSet
       unreadFields.foreach(c.prefix.splice.markUnread(_))
     }
-    return result
+    result
   }
   
-  def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(fields: c.Expr[In => Fields]): c.Expr[Unit] = {
+  def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
+      (c: Context { type PrefixType = InputHintable[In, Out] })
+      (fields: c.Expr[In => Fields])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
@@ -265,14 +314,18 @@ object InputHintableMacros {
       val unreadFields = fieldSelector.inputFields.map(_.localPos).toSet.diff(fieldSet)
       unreadFields.foreach(c.prefix.splice.markUnread(_))
     }
-    return result
+    result
   }
   
-  def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag](c: Context { type PrefixType = InputHintable[In, Out] })(from: c.Expr[In => Fields], to: c.Expr[Out => Fields]): c.Expr[Unit] = {
+  def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
+      (c: Context { type PrefixType = InputHintable[In, Out] })
+      (from: c.Expr[In => Fields], to: c.Expr[Out => Fields])
+    : c.Expr[Unit] = {
+
     import c.universe._
 
-     val slave = MacroContextHolder.newMacroHelper(c)
-    
+    val slave = MacroContextHolder.newMacroHelper(c)
+
     val generatedFromFieldSelector = slave.getSelector(from)
     val generatedToFieldSelector = slave.getSelector(to)
 
@@ -281,14 +334,16 @@ object InputHintableMacros {
       val fromSelector = new FieldSelector(c.prefix.splice.getInputUDT, fromSelection)
       val toSelection = generatedToFieldSelector.splice
       val toSelector = new FieldSelector(c.prefix.splice.getOutputUDT, toSelection)
-      val pairs = fromSelector.selectedFields.map(_.localPos).zip(toSelector.selectedFields.map(_.localPos))
+      val pairs = fromSelector.selectedFields.map(_.localPos)
+        .zip(toSelector.selectedFields.map(_.localPos))
       pairs.foreach(c.prefix.splice.markCopied.tupled)
     }
-    return result
+    result
   }
 }
 
-trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] { this: DataSet[Out] =>
+trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] {
+  this: DataSet[Out] =>
 	override def markUnread = contract.getUDF.asInstanceOf[UDF1[In, Out]].markInputFieldUnread _ 
 	override def markCopied = contract.getUDF.asInstanceOf[UDF1[In, Out]].markFieldCopied _ 
 	
@@ -298,15 +353,19 @@ trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintab
 
 trait TwoInputHintable[LeftIn, RightIn, Out] extends OutputHintable[Out] { this: DataSet[Out] =>
   val left = new DataSet[Out](contract) with OneInputHintable[LeftIn, Out] {
-	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markInputFieldUnread(Left(pos))}
-	override def markCopied = { (from: Int, to: Int) => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)} 
+	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
+    .markInputFieldUnread(Left(pos))}
+	override def markCopied = { (from: Int, to: Int) => contract.getUDF
+    .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)}
 	override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].leftInputUDT
 	override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
   }
   
   val right = new DataSet[Out](contract) with OneInputHintable[RightIn, Out] {
-	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markInputFieldUnread(Right(pos))}
-	override def markCopied = { (from: Int, to: Int) => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)} 
+	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
+    .markInputFieldUnread(Right(pos))}
+	override def markCopied = { (from: Int, to: Int) => contract.getUDF
+    .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)}
 	override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].rightInputUDT
 	override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala
index d004492..ffc91e9 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSet.scala
@@ -28,7 +28,8 @@ import eu.stratosphere.types.Record
 
 class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record]) {
   
-  def cogroup[RightIn](rightInput: DataSet[RightIn]) = new CoGroupDataSet[T, RightIn](this, rightInput)
+  def cogroup[RightIn](rightInput: DataSet[RightIn]) =
+    new CoGroupDataSet[T, RightIn](this, rightInput)
   def cross[RightIn](rightInput: DataSet[RightIn]) = new CrossDataSet[T, RightIn](this, rightInput)
   def join[RightIn](rightInput: DataSet[RightIn]) = new JoinDataSet[T, RightIn](this, rightInput)
   
@@ -42,17 +43,28 @@ class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record])
   // reduce without grouping
   def reduce(fun: (T, T) => T) = macro ReduceMacros.globalReduce[T]
   def reduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.globalReduceGroup[T, Out]
-  def combinableReduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.combinableGlobalReduceGroup[T]
+  def combinableReduceAll[Out](fun: Iterator[T] => Out) =
+    macro ReduceMacros.combinableGlobalReduceGroup[T]
 
   def union(secondInput: DataSet[T]) = UnionOperator.impl[T](this, secondInput)
   
-  def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) = macro IterateMacros.iterateWithDelta[T, DeltaItem]
+  def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) =
+    macro IterateMacros.iterateWithDelta[T, DeltaItem]
   def iterate(n: Int, stepFunction: DataSet[T] => DataSet[T])= macro IterateMacros.iterate[T]
-  def iterateWithTermination[C](n: Int, stepFunction: DataSet[T] => DataSet[T], terminationFunction: (DataSet[T],
-    DataSet[T]) => DataSet[C]) = macro IterateMacros.iterateWithTermination[T, C]
-  def iterateWithDelta[SolutionKey, WorksetItem](workset: DataSet[WorksetItem], solutionSetKey: T => SolutionKey, stepFunction: (DataSet[T], DataSet[WorksetItem]) => (DataSet[T], DataSet[WorksetItem]), maxIterations: Int) = macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
+  def iterateWithTermination[C](
+      n: Int,
+      stepFunction: DataSet[T] => DataSet[T],
+      terminationFunction: (DataSet[T],DataSet[T]) => DataSet[C]) =
+    macro IterateMacros.iterateWithTermination[T, C]
+  def iterateWithDelta[SolutionKey, WorksetItem](
+      workset: DataSet[WorksetItem],
+      solutionSetKey: T => SolutionKey,
+      stepFunction: (DataSet[T], DataSet[WorksetItem]) =>
+                    (DataSet[T], DataSet[WorksetItem]), maxIterations: Int) =
+    macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
   
   def write(url: String, format: ScalaOutputFormat[T]) = DataSinkOperator.write(this, url, format)
-  def write(url: String, format: ScalaOutputFormat[T], name: String) = DataSinkOperator.write(this, url, format, name)
+  def write(url: String, format: ScalaOutputFormat[T], name: String) =
+    DataSinkOperator.write(this, url, format, name)
   
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSource.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSource.scala
index 97688dc..ff8bc41 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSource.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/DataSource.scala
@@ -37,7 +37,7 @@ object DataSource {
 
         override def getUDF = format.getUDF
 
-        override def persistConfiguration() = format.persistConfiguration(this.getParameters())
+        override def persistConfiguration() = format.persistConfiguration(this.getParameters)
       }
 
 //      case "ext" => new GenericDataSource[GenericInputFormat[_]](format.asInstanceOf[GenericInputFormat[_]], uri.toString)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaOperator.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaOperator.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaOperator.scala
index 7d7c6b5..d1da8dd 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaOperator.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaOperator.scala
@@ -26,8 +26,9 @@ import eu.stratosphere.api.common.operators.Union
 import eu.stratosphere.types.Record
 import eu.stratosphere.types.{Nothing => JavaNothing}
 
-trait ScalaOperator[T, UT] { this: Operator[UT] =>
-  def getUDF(): UDF[T]
+trait ScalaOperator[T, UT] {
+  this: Operator[UT] =>
+  def getUDF: UDF[T]
   def getKeys: Seq[FieldSelector] = Seq()
   def persistConfiguration(): Unit = {}
   
@@ -41,14 +42,17 @@ trait ScalaOperator[T, UT] { this: Operator[UT] =>
         for ((key, inputNum) <- getKeys.zipWithIndex) {
           
           val source = key.selectedFields.toSerializerIndexArray
-          val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse { contract.getKeyColumns(inputNum) }
+          val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse {
+            contract.getKeyColumns(inputNum) }
 
-          assert(source.length == target.length, "Attempt to write " + source.length + " key indexes to an array of size " + target.length)
+          assert(source.length == target.length, "Attempt to write " + source.length +
+            " key indexes to an array of size " + target.length)
           System.arraycopy(source, 0, target, 0, source.length)
         }
       }
       
-      case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set keys on a contract that doesn't support them")
+      case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set " +
+        "keys on a contract that doesn't support them")
       
       case _ =>
     }
@@ -60,11 +64,14 @@ trait ScalaOperator[T, UT] { this: Operator[UT] =>
   protected def annotations: Seq[Annotation] = Seq()
 
   def getUserCodeAnnotation[A <: Annotation](annotationClass: Class[A]): A = {
-    val res = annotations find { _.annotationType().equals(annotationClass) } map { _.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
+    val res = annotations find { _.annotationType().equals(annotationClass) } map {
+      _.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
 //    println("returning ANOOT: " + res + " FOR: " + annotationClass.toString)
 //    res match {
-//      case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " + r.value().mkString(","))
-//      case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " + r.value().mkString(","))
+//      case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " +
+//        r.value().mkString(","))
+//      case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " +
+//        r.value().mkString(","))
 //      case _ =>
 //    }
     res
@@ -75,38 +82,46 @@ trait NoOpScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Oper
 }
 
 trait HigherOrderScalaOperator[T] extends ScalaOperator[T, Record] { this: Operator[Record] =>
-  override def getUDF(): UDF0[T]
+  override def getUDF: UDF0[T]
 }
 
 trait BulkIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
 }
 
-trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
+trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] {
+  this: Operator[Record] =>
   val key: FieldSelector
 }
 
-trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] { this: Operator[JavaNothing] =>
-  override def getUDF(): UDF1[In, Nothing]
+trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] {
+  this: Operator[JavaNothing] =>
+  override def getUDF: UDF1[In, Nothing]
 }
 
 trait OneInputScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
-  override def getUDF(): UDF1[In, Out]
+  override def getUDF: UDF1[In, Out]
 }
 
-trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
-  override def getUDF(): UDF2[In1, In2, Out]
+trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] {
+  this: Operator[Record] =>
+  override def getUDF: UDF2[In1, In2, Out]
 }
 
-trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] { this: Union[Record] =>
-  override def getUDF(): UDF2[In, In, In]
+trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] {
+  this: Union[Record] =>
+  override def getUDF: UDF2[In, In, In]
 }
 
-trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] { this: Operator[Record] =>
+trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] {
+  this: Operator[Record] =>
   val key: FieldSelector
   override def getKeys = Seq(key)
 }
 
-trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] extends TwoInputScalaOperator[LeftIn, RightIn, Out] { this: Operator[Record] =>
+trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out]
+  extends TwoInputScalaOperator[LeftIn, RightIn, Out] {
+
+  this: Operator[Record] =>
   val leftKey: FieldSelector
   val rightKey: FieldSelector
   override def getKeys = Seq(leftKey, rightKey)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/MapOperator.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/MapOperator.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/MapOperator.scala
index 766ae0d..d60b552 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/MapOperator.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/MapOperator.scala
@@ -18,49 +18,54 @@ import scala.reflect.macros.Context
 
 import eu.stratosphere.api.java.record.operators.MapOperator
 import eu.stratosphere.types.Record
-import eu.stratosphere.api.common.operators.Operator
-import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.util.Collector
 
 import eu.stratosphere.api.scala.codegen.{MacroContextHolder, Util}
 import eu.stratosphere.api.scala._
 import eu.stratosphere.api.scala.analysis._
-import eu.stratosphere.api.scala.functions.{MapFunction, FlatMapFunction, FilterFunction, MapFunctionBase}
+import eu.stratosphere.api.scala.functions.{MapFunction, MapFunctionBase}
+import eu.stratosphere.api.scala.functions.{FlatMapFunction, FilterFunction}
 
 object MapMacros {
 
-  def map[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
+  def map[In: c.WeakTypeTag, Out: c.WeakTypeTag]
+      (c: Context { type PrefixType = DataSet[In] })
+      (fun: c.Expr[In => Out]): c.Expr[DataSet[Out]
+    with OneInputHintable[In, Out]] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
     
 //    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
 
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
-    val stub: c.Expr[MapFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
-      reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-      new MapFunctionBase[In, Out] {
-//        val userFun = ClosureCleaner.clean(fun.splice)
-//        val userFun = fun.splice
-        override def map(record: Record, out: Collector[Record]) = {
-          val input = deserializer.deserializeRecyclingOn(record)
-          val output = fun.splice.apply(input)
-
-          record.setNumFields(outputLength)
-
-          for (field <- discard)
-            record.setNull(field)
-
-          serializer.serialize(output, record)
-          out.collect(record)
+    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
+    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
+
+    val stub: c.Expr[MapFunctionBase[In, Out]] =
+      if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
+        reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
+      else
+        reify {
+          implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
+          implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+          new MapFunctionBase[In, Out] {
+            // val userFun = ClosureCleaner.clean(fun.splice)
+            // val userFun = fun.splice
+            override def map(record: Record, out: Collector[Record]) = {
+              val input = deserializer.deserializeRecyclingOn(record)
+              val output = fun.splice.apply(input)
+
+              record.setNumFields(outputLength)
+
+              for (field <- discard)
+                record.setNull(field)
+
+              serializer.serialize(output, record)
+              out.collect(record)
+            }
+          }
         }
-      }
-    }
 
     val contract = reify {
       val input = c.prefix.splice.contract
@@ -78,47 +83,53 @@ object MapMacros {
       stream
     }
 
-    val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
+    val result = c.Expr[DataSet[Out]
+      with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
 
-    return result
+    result
   }
   
-  def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
+  def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag]
+      (c: Context { type PrefixType = DataSet[In] })
+      (fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out]
+    with OneInputHintable[In, Out]] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
     
 //    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
 
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
+    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
+    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
 
-    val stub: c.Expr[MapFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
-      reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-      new MapFunctionBase[In, Out] {
-        override def map(record: Record, out: Collector[Record]) = {
-          val input = deserializer.deserializeRecyclingOn(record)
-          val output = fun.splice.apply(input)
+    val stub: c.Expr[MapFunctionBase[In, Out]] =
+      if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
+        reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
+      else reify {
+        implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
+        implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+        new MapFunctionBase[In, Out] {
+          override def map(record: Record, out: Collector[Record]) = {
+            val input = deserializer.deserializeRecyclingOn(record)
+            val output = fun.splice.apply(input)
 
-          if (output.nonEmpty) {
+            if (output.nonEmpty) {
 
-            record.setNumFields(outputLength)
+              record.setNumFields(outputLength)
 
-            for (field <- discard)
-              record.setNull(field)
+              for (field <- discard)
+                record.setNull(field)
 
-            for (item <- output) {
+              for (item <- output) {
 
-              serializer.serialize(item, record)
-              out.collect(record)
+                serializer.serialize(item, record)
+                out.collect(record)
+              }
             }
           }
         }
       }
-    }
     val contract = reify {
       val input = c.prefix.splice.contract
       val generatedStub = ClosureCleaner.clean(stub.splice)
@@ -135,33 +146,41 @@ object MapMacros {
       stream
     }
 
-    val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
+    val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut),
+      contract.tree))
 
-    return result
+    result
   }
   
-  def filter[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[In => Boolean]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
+  def filter[In: c.WeakTypeTag]
+      (c: Context { type PrefixType = DataSet[In] })
+      (fun: c.Expr[In => Boolean]): c.Expr[DataSet[In]
+
+    with OneInputHintable[In, In]] = {
+
     import c.universe._
 
     val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
 
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-
-    val stub: c.Expr[MapFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
-      reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-      new MapFunctionBase[In, In] {
-        override def map(record: Record, out: Collector[Record]) = {
-          val input = deserializer.deserializeRecyclingOn(record)
-          if (fun.splice.apply(input)) {
-        	  out.collect(record)
+    //    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
+
+    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
+
+    val stub: c.Expr[MapFunctionBase[In, In]] =
+      if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
+        reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
+      else
+        reify {
+          implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
+          new MapFunctionBase[In, In] {
+            override def map(record: Record, out: Collector[Record]) = {
+              val input = deserializer.deserializeRecyclingOn(record)
+              if (fun.splice.apply(input)) {
+        	      out.collect(record)
+              }
+            }
           }
         }
-      }
-    }
     val contract = reify {
       val input = c.prefix.splice.contract
       val generatedStub = ClosureCleaner.clean(stub.splice)
@@ -175,14 +194,15 @@ object MapMacros {
       }
       val stream = new DataSet[In](contract) with OneInputHintable[In, In] {}
       contract.persistHints = { () =>
-        stream.applyHints(contract);
+        stream.applyHints(contract)
         0 until generatedStub.udf.getOutputLength foreach { i => stream.markCopied(i, i) }
       }
       stream
     }
 
-    val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
+    val result = c.Expr[DataSet[In]
+      with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
 
-    return result
+    result
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/UnionOperator.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/UnionOperator.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/UnionOperator.scala
index af58d21..d3f2e85 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/UnionOperator.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/operators/UnionOperator.scala
@@ -23,12 +23,14 @@ import eu.stratosphere.types.Record
 object UnionOperator {
 
   def impl[In](firstInput: DataSet[In], secondInput: DataSet[In]): DataSet[In] = {
-    val union = new Union[Record](firstInput.contract, secondInput.contract) with UnionScalaOperator[In] {
-      private val inputUDT = firstInput.contract.getUDF().outputUDT
+    val union = new Union[Record](firstInput.contract, secondInput.contract)
+      with UnionScalaOperator[In] {
+
+      private val inputUDT = firstInput.contract.getUDF.outputUDT
       private val udf: UDF2[In, In, In] = new UDF2(inputUDT, inputUDT, inputUDT)
 
-      override def getUDF = udf;
+      override def getUDF = udf
     }
-    return new DataSet(union)
+    new DataSet(union)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f511953/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/FilterITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/FilterITCase.java
index 6d4aed8..ffa804e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/FilterITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/FilterITCase.java
@@ -319,7 +319,7 @@ public class FilterITCase extends JavaProgramTestBase {
 							public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
 								return (value.f1 == (broadcastSum / 11));
 							}
-						}).withBroadcastSet(intDs, "ints");;
+						}).withBroadcastSet(intDs, "ints");
 				filterDs.writeAsCsv(resultPath);
 				env.execute();