You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/09 20:16:40 UTC

[1/5] flink git commit: [FLINK-2619] [tests] Fix for some unexecuted Scala tests

Repository: flink
Updated Branches:
  refs/heads/master 063b1092b -> 361947d6c


[FLINK-2619] [tests] Fix for some unexecuted Scala tests

This closes #1103


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9edd9a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9edd9a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9edd9a8

Branch: refs/heads/master
Commit: c9edd9a88ab20bc314470fb3eb7da4c643dd7d57
Parents: 063b109
Author: Chiwan Park <ch...@apache.org>
Authored: Mon Sep 7 23:03:56 2015 +0900
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 18:03:13 2015 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.scala             |   5 +-
 .../TaskManagerLossFailsTasksTest.scala         |   5 +-
 .../jobmanager/JobManagerRegistrationTest.scala |   4 +-
 .../test/operations/GraphOperationsITCase.scala |   1 +
 .../flink/api/scala/ScalaShellITSuite.scala     |  85 +++----
 .../manual/MassiveCaseClassSortingITCase.scala  | 243 +++++++++++++++++++
 .../misc/MassiveCaseClassSortingITCase.scala    | 240 ------------------
 7 files changed, 298 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 6060bc3..434a8cb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+@RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
 
   val NUM_TASKS = 31
@@ -118,7 +121,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         }
 
         eg.getState should equal(JobStatus.FINISHED)
-      }catch{
+      } catch {
         case t: Throwable =>
           t.printStackTrace()
           fail(t.getMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 3d0d3a5..177dc85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -26,8 +26,11 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+@RunWith(classOf[JUnitRunner])
 class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
   "A task manager loss" must {
@@ -64,7 +67,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
         instance1.markDead()
         eg.getState should equal(JobStatus.FAILING)
-      }catch{
+      } catch {
         case t:Throwable =>
           t.printStackTrace()
           fail(t.getMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4368309..7487670 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -27,10 +27,11 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
 import org.junit.Assert.{assertNotEquals, assertNotNull}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 import scala.concurrent.duration._
@@ -41,6 +42,7 @@ import scala.language.postfixOps
  * It also tests the JobManager's response to heartbeats from TaskManagers it does
  * not know.
  */
+@RunWith(classOf[JUnitRunner])
 class JobManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with
 ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
index d49e565..713eb8d 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.graph.scala.test.operations
 
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.api.scala._

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 9717ae7..e932cd2 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,17 +22,20 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
+@RunWith(classOf[JUnitRunner])
 class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
   test("Iteration test with iterative Pi example") {
 
-    val input : String =
+    val input: String =
       """
         val initial = env.fromElements(0)
 
@@ -46,9 +49,9 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
         }
         val result = count map { c => c / 10000.0 * 4 }
         result.collect()
-    """.stripMargin
+      """.stripMargin
 
-    val output : String = processInShell(input)
+    val output: String = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -56,7 +59,8 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
   }
 
   test("WordCount in Shell") {
-    val input = """
+    val input =
+      """
         val text = env.fromElements("To be, or not to be,--that is the question:--",
         "Whether 'tis nobler in the mind to suffer",
         "The slings and arrows of outrageous fortune",
@@ -64,7 +68,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
         val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
         val result = counts.print()
-    """.stripMargin
+      """.stripMargin
 
     val output = processInShell(input)
 
@@ -72,7 +76,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     output should not include "error"
     output should not include "Exception"
 
-//    some of the words that should be included
+    // some of the words that should be included
     output should include("(a,1)")
     output should include("(whether,1)")
     output should include("(to,4)")
@@ -80,14 +84,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
   }
 
   test("Sum 1..10, should be 55") {
-    val input : String =
+    val input =
       """
         val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
         val reduced = input.reduce(_+_)
         reduced.print
       """.stripMargin
 
-    val output : String = processInShell(input)
+    val output = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -97,7 +101,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
   }
 
   test("WordCount in Shell with custom case class") {
-    val input : String =
+    val input =
       """
       case class WC(word: String, count: Int)
 
@@ -111,7 +115,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
       reduced.print()
       """.stripMargin
 
-    val output : String = processInShell(input)
+    val output = processInShell(input)
 
     output should not include "failed"
     output should not include "error"
@@ -120,11 +124,9 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     output should include("WC(hello,1)")
     output should include("WC(world,10)")
   }
-  
-  
+
   test("Submit external library") {
-    
-    val input : String =
+    val input =
       """
         import org.apache.flink.ml.math._
         val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
@@ -132,12 +134,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
       """.stripMargin
 
     // find jar file that contains the ml code
-    var externalJar : String = ""
-    var folder : File = new File("../flink-ml/target/");
-    var listOfFiles : Array[File] = folder.listFiles();
-    for(i <- 0 to listOfFiles.length - 1){
-      var filename : String = listOfFiles(i).getName();
-      if(!filename.contains("test") && !filename.contains("original") && filename.contains(".jar")){
+    var externalJar = ""
+    val folder = new File("../flink-ml/target/")
+    val listOfFiles = folder.listFiles()
+
+    for (i <- listOfFiles.indices) {
+      val filename: String = listOfFiles(i).getName
+      if (!filename.contains("test") && !filename.contains("original") && filename.contains(
+        ".jar")) {
         println("ive found file:" + listOfFiles(i).getAbsolutePath)
         externalJar = listOfFiles(i).getAbsolutePath
       }
@@ -145,13 +149,13 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
     assert(externalJar != "")
 
-    val output : String = processInShell(input,Option(externalJar))
+    val output: String = processInShell(input, Option(externalJar))
 
     output should not include "failed"
     output should not include "error"
     output should not include "Exception"
 
-    output should include( "\nDenseVector(1.0, 2.0, 3.0)")
+    output should include("\nDenseVector(1.0, 2.0, 3.0)")
   }
 
   /**
@@ -159,8 +163,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
    * @param input commands to be processed in the shell
    * @return output of shell
    */
-  def processInShell(input : String, externalJars : Option[String] = None): String ={
-    
+  def processInShell(input: String, externalJars: Option[String] = None): String = {
     val in = new BufferedReader(new StringReader(input + "\n"))
     val out = new StringWriter()
     val baos = new ByteArrayOutputStream()
@@ -174,28 +177,26 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
       case Some(c) => c.getLeaderRPCPort
       case _ => throw new RuntimeException("Test cluster not initialized.")
     }
-    
-    var repl : FlinkILoop= null 
-    
-    externalJars match {
-      case Some(ej) => repl = new FlinkILoop(
-        host, port,  
-        Option(Array(ej)), 
+
+    val repl = externalJars match {
+      case Some(ej) => new FlinkILoop(
+        host, port,
+        Option(Array(ej)),
+        in, new PrintWriter(out))
+
+      case None => new FlinkILoop(
+        host, port,
         in, new PrintWriter(out))
-        
-      case None => repl = new FlinkILoop(
-        host,port,
-        in,new PrintWriter(out))
     }
-    
+
     repl.settings = new Settings()
 
     // enable this line to use scala in intellij
     repl.settings.usejavacp.value = true
-    
+
     externalJars match {
       case Some(ej) => repl.settings.classpath.value = ej
-      case None => 
+      case None =>
     }
 
     repl.process(repl.settings)
@@ -205,7 +206,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     System.setOut(oldOut)
 
     baos.flush()
-    
+
     val stdout = baos.toString
 
     out.toString + stdout
@@ -230,6 +231,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
   }
 
   override def afterAll(): Unit = {
-    cluster.map(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
new file mode 100644
index 0000000..7385fa2
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.manual
+
+import java.io.File
+import java.util.Random
+import java.io.BufferedWriter
+import java.io.FileWriter
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.scala._
+import java.io.BufferedReader
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import java.io.FileReader
+import org.apache.flink.util.MutableObjectIterator
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
+import org.junit.Assert._
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
+
+/**
+ * This test is wrote as manual test.
+ */
+class MassiveCaseClassSortingITCase {
+  
+  val SEED : Long = 347569784659278346L
+  
+  def testStringTuplesSorting() {
+    
+    val NUM_STRINGS = 3000000
+    var input: File = null
+    var sorted: File = null
+    
+    try {
+      input = generateFileWithStringTuples(NUM_STRINGS,
+                                           "http://some-uri.com/that/is/a/common/prefix/to/all")
+        
+      sorted = File.createTempFile("sorted_strings", "txt")
+      
+      val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
+                        + input.getAbsolutePath + "\" | sort > \"" + sorted.getAbsolutePath + "\"")
+
+      var p: Process = null
+      try {
+        p = Runtime.getRuntime.exec(command)
+        val retCode = p.waitFor()
+        if (retCode != 0) {
+          throw new Exception("Command failed with return code " + retCode)
+        }
+        p = null
+      }
+      finally {
+        if (p != null) {
+          p.destroy()
+        }
+      }
+      
+      var sorter: UnilateralSortMerger[StringTuple] = null
+      
+      var reader: BufferedReader = null
+      var verifyReader: BufferedReader = null
+      
+      try {
+        reader = new BufferedReader(new FileReader(input))
+        val inputIterator = new StringTupleReader(reader)
+        
+        val typeInfo = implicitly[TypeInformation[StringTuple]]
+          .asInstanceOf[CompositeType[StringTuple]]
+        
+        val serializer = typeInfo.createSerializer(new ExecutionConfig)
+        val comparator = typeInfo.createComparator(
+          Array(0, 1),
+          Array(true, true),
+          0,
+          new ExecutionConfig)
+        
+        val mm = new MemoryManager(1024 * 1024, 1)
+        val ioMan = new IOManagerAsync()
+        
+        sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
+              new DummyInvokable(), 
+              new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
+              comparator, 1.0, 4, 0.8f)
+            
+        val sortedData = sorter.getIterator
+        reader.close()
+        
+        verifyReader = new BufferedReader(new FileReader(sorted))
+        val verifyIterator = new StringTupleReader(verifyReader)
+        
+        var num = 0
+        var hasMore = true
+        
+        while (hasMore) {
+          val next = verifyIterator.next(null)
+          
+          if (next != null ) {
+            num += 1
+            
+            val nextFromFlinkSort = sortedData.next(null)
+            
+            assertNotNull(nextFromFlinkSort)
+            
+            assertEquals(next.key1, nextFromFlinkSort.key1)
+            assertEquals(next.key2, nextFromFlinkSort.key2)
+            
+            // assert array equals does not work here
+            assertEquals(next.value.length, nextFromFlinkSort.value.length)
+            for (i <- 0 until next.value.length) {
+              assertEquals(next.value(i), nextFromFlinkSort.value(i))
+            }
+            
+          }
+          else {
+            hasMore = false
+          }
+        }
+        
+        assertNull(sortedData.next(null))
+        assertEquals(NUM_STRINGS, num)
+      }
+      finally {
+        if (reader != null) {
+          reader.close()
+        }
+        if (verifyReader != null) {
+          verifyReader.close()
+        }
+        if (sorter != null) {
+          sorter.close()
+        }
+      }
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        e.getMessage
+      }
+    }
+    finally {
+      if (input != null) {
+        input.delete()
+      }
+      if (sorted != null) {
+        sorted.delete()
+      }
+    }
+  }
+  
+  
+  private def generateFileWithStringTuples(numStrings: Int, prefix: String): File = {
+    val rnd = new Random(SEED)
+    val bld = new StringBuilder()
+    val f = File.createTempFile("strings", "txt")
+    
+    var wrt: BufferedWriter = null
+    
+    try {
+      wrt = new BufferedWriter(new FileWriter(f))
+
+      for (i <- 0 until numStrings) {
+        bld.setLength(0)
+        val numComps = rnd.nextInt(5) + 2
+        
+        for (z <- 0 until numComps) {
+          if (z > 0) {
+            bld.append(' ')
+          }
+          bld.append(prefix)
+          val len = rnd.nextInt(20) + 10
+          
+          for (k <- 0 until len) {
+            val c = (rnd.nextInt(80) + 40).toChar
+            bld.append(c)
+          }
+        }
+        val str = bld.toString
+        wrt.write(str)
+        wrt.newLine()
+      }
+    }
+    finally {
+      wrt.close()
+    }
+    f
+  }
+}
+
+object MassiveCaseClassSortingITCase {
+  
+  def main(args: Array[String]) {
+    new MassiveCaseClassSortingITCase().testStringTuplesSorting()
+  }
+}
+
+case class StringTuple(key1: String, key2: String, value: Array[String])
+  
+class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterator[StringTuple] {
+  
+  override def next(reuse: StringTuple): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+
+  override def next(): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+
+}
+
+class DummyInvokable extends AbstractInvokable {
+
+  override def registerInputOutput() = {}
+  override def invoke() = {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9edd9a8/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
deleted file mode 100644
index dd27eb5..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.misc
-
-import java.io.File
-import java.util.Random
-import java.io.BufferedWriter
-import java.io.FileWriter
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.scala._
-import java.io.BufferedReader
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
-import java.io.FileReader
-import org.apache.flink.util.MutableObjectIterator
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
-import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
-import org.junit.Assert._
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-
-class MassiveCaseClassSortingITCase {
-  
-  val SEED : Long = 347569784659278346L
-  
-  def testStringTuplesSorting() {
-    
-    val NUM_STRINGS = 3000000
-    var input: File = null
-    var sorted: File = null
-    
-    try {
-      input = generateFileWithStringTuples(NUM_STRINGS,
-                                           "http://some-uri.com/that/is/a/common/prefix/to/all")
-        
-      sorted = File.createTempFile("sorted_strings", "txt")
-      
-      val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
-                        + input.getAbsolutePath + "\" | sort > \"" + sorted.getAbsolutePath + "\"")
-
-      var p: Process = null
-      try {
-        p = Runtime.getRuntime.exec(command)
-        val retCode = p.waitFor()
-        if (retCode != 0) {
-          throw new Exception("Command failed with return code " + retCode)
-        }
-        p = null
-      }
-      finally {
-        if (p != null) {
-          p.destroy()
-        }
-      }
-      
-      var sorter: UnilateralSortMerger[StringTuple] = null
-      
-      var reader: BufferedReader = null
-      var verifyReader: BufferedReader = null
-      
-      try {
-        reader = new BufferedReader(new FileReader(input))
-        val inputIterator = new StringTupleReader(reader)
-        
-        val typeInfo = implicitly[TypeInformation[StringTuple]]
-          .asInstanceOf[CompositeType[StringTuple]]
-        
-        val serializer = typeInfo.createSerializer(new ExecutionConfig)
-        val comparator = typeInfo.createComparator(
-          Array(0, 1),
-          Array(true, true),
-          0,
-          new ExecutionConfig)
-        
-        val mm = new MemoryManager(1024 * 1024, 1)
-        val ioMan = new IOManagerAsync()
-        
-        sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
-              new DummyInvokable(), 
-              new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
-              comparator, 1.0, 4, 0.8f)
-            
-        val sortedData = sorter.getIterator
-        reader.close()
-        
-        verifyReader = new BufferedReader(new FileReader(sorted))
-        val verifyIterator = new StringTupleReader(verifyReader)
-        
-        var num = 0
-        var hasMore = true
-        
-        while (hasMore) {
-          val next = verifyIterator.next(null)
-          
-          if (next != null ) {
-            num += 1
-            
-            val nextFromFlinkSort = sortedData.next(null)
-            
-            assertNotNull(nextFromFlinkSort)
-            
-            assertEquals(next.key1, nextFromFlinkSort.key1)
-            assertEquals(next.key2, nextFromFlinkSort.key2)
-            
-            // assert array equals does not work here
-            assertEquals(next.value.length, nextFromFlinkSort.value.length)
-            for (i <- 0 until next.value.length) {
-              assertEquals(next.value(i), nextFromFlinkSort.value(i))
-            }
-            
-          }
-          else {
-            hasMore = false
-          }
-        }
-        
-        assertNull(sortedData.next(null))
-        assertEquals(NUM_STRINGS, num)
-      }
-      finally {
-        if (reader != null) {
-          reader.close()
-        }
-        if (verifyReader != null) {
-          verifyReader.close()
-        }
-        if (sorter != null) {
-          sorter.close()
-        }
-      }
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        e.getMessage
-      }
-    }
-    finally {
-      if (input != null) {
-        input.delete()
-      }
-      if (sorted != null) {
-        sorted.delete()
-      }
-    }
-  }
-  
-  
-  private def generateFileWithStringTuples(numStrings: Int, prefix: String): File = {
-    val rnd = new Random(SEED)
-    val bld = new StringBuilder()
-    val f = File.createTempFile("strings", "txt")
-    
-    var wrt: BufferedWriter = null
-    
-    try {
-      wrt = new BufferedWriter(new FileWriter(f))
-
-      for (i <- 0 until numStrings) {
-        bld.setLength(0)
-        val numComps = rnd.nextInt(5) + 2
-        
-        for (z <- 0 until numComps) {
-          if (z > 0) {
-            bld.append(' ')
-          }
-          bld.append(prefix)
-          val len = rnd.nextInt(20) + 10
-          
-          for (k <- 0 until len) {
-            val c = (rnd.nextInt(80) + 40).toChar
-            bld.append(c)
-          }
-        }
-        val str = bld.toString
-        wrt.write(str)
-        wrt.newLine()
-      }
-    }
-    finally {
-      wrt.close()
-    }
-    f
-  }
-}
-
-object MassiveCaseClassSortingITCase {
-  
-  def main(args: Array[String]) {
-    new MassiveCaseClassSortingITCase().testStringTuplesSorting()
-  }
-}
-
-case class StringTuple(key1: String, key2: String, value: Array[String])
-  
-class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterator[StringTuple] {
-  
-  override def next(reuse: StringTuple): StringTuple = {
-    val line = reader.readLine()
-    if (line == null) {
-      return null
-    }
-    val parts = line.split(" ")
-    StringTuple(parts(0), parts(1), parts)
-  }
-
-  override def next(): StringTuple = {
-    val line = reader.readLine()
-    if (line == null) {
-      return null
-    }
-    val parts = line.split(" ")
-    StringTuple(parts(0), parts(1), parts)
-  }
-
-}
-
-class DummyInvokable extends AbstractInvokable {
-
-  override def registerInputOutput() = {}
-  override def invoke() = {}
-}


[4/5] flink git commit: [FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver

Posted by se...@apache.org.
[FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/361947d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/361947d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/361947d6

Branch: refs/heads/master
Commit: 361947d6c5490f0c6d04ffec709a353995aad373
Parents: 4b6eae5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 18:59:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/GroupReduceCombineDriver.java      |  4 +++-
 .../apache/flink/runtime/operators/CombineTaskTest.java  | 11 +++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 028ed95..7115a4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -184,7 +184,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		}
 
 		// sort, combine, and send the final batch
-		sortAndCombine();
+		if (running) {
+			sortAndCombine();
+		}
 	}
 
 	private void sortAndCombine() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 932e746..b6ce2d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -53,10 +53,12 @@ public class CombineTaskTest
 	
 	private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
 
+	@SuppressWarnings("unchecked")
 	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
 			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
 			new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
 	
+	
 	private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
 			new int[]{0},
 			new TypeComparator<?>[] { new IntComparator(true) },
@@ -179,9 +181,14 @@ public class CombineTaskTest
 			testTask.cancel();
 			
 			// make sure it reacts to the canceling in some time
-			taskRunner.join(5000);
+			long deadline = System.currentTimeMillis() + 10000;
+			do {
+				taskRunner.interrupt();
+				taskRunner.join(5000);
+			}
+			while (taskRunner.isAlive() && System.currentTimeMillis() < deadline);
 			
-			assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
+			assertFalse("Task did not cancel properly within in 10 seconds.", taskRunner.isAlive());
 		}
 		catch (Exception e) {
 			e.printStackTrace();


[2/5] flink git commit: [FLINK-2638] [core] Add @SafeVarargs to the ExecutionEnvironment's "fromElements(...)" method.

Posted by se...@apache.org.
[FLINK-2638] [core] Add @SafeVarargs to the ExecutionEnvironment's "fromElements(...)" method.

This closes #1109


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0502e4f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0502e4f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0502e4f

Branch: refs/heads/master
Commit: d0502e4fb9338c6ed44a2c0a925aa2f60ebeb7f2
Parents: 95d035a
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 8 17:22:23 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/api/java/ExecutionEnvironment.java | 3 ++-
 .../streaming/api/environment/StreamExecutionEnvironment.java     | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0502e4f/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index d3d8192..32049c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -703,7 +703,8 @@ public abstract class ExecutionEnvironment {
 	 * @param data The elements to make up the data set.
 	 * @return A DataSet representing the given list of elements.
 	 */
-	public <X> DataSource<X> fromElements(X... data) {
+	@SafeVarargs
+	public final <X> DataSource<X> fromElements(X... data) {
 		if (data == null) {
 			throw new IllegalArgumentException("The data must not be null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0502e4f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2352623..f052389 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -553,7 +553,8 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given array of elements
 	 */
-	public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
+	@SafeVarargs
+	public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
 		if (data.length == 0) {
 			throw new IllegalArgumentException("fromElements needs at least one element as argument");
 		}


[3/5] flink git commit: [FLINK-2003] [docs] Added instructions for encrypted filesystems

Posted by se...@apache.org.
[FLINK-2003] [docs] Added instructions for encrypted filesystems

This closes #1100


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b6eae5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b6eae5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b6eae5a

Branch: refs/heads/master
Commit: 4b6eae5a612ab1b28b64715acd831dccc0ea6d35
Parents: d0502e4
Author: Theodore Vasiloudis <tv...@sics.se>
Authored: Mon Sep 7 16:25:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 docs/setup/building.md | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b6eae5a/docs/setup/building.md
----------------------------------------------------------------------
diff --git a/docs/setup/building.md b/docs/setup/building.md
index 2581812..ce59761 100644
--- a/docs/setup/building.md
+++ b/docs/setup/building.md
@@ -122,6 +122,26 @@ Flink is developed against Scala *2.10*, and tested additionally against Scala *
 Newer versions may be compatible, depending on breaking changes in the language features used by Flink, and the availability of Flink's dependencies in those Scala versions. The dependencies written in Scala include for example *Kafka*, *Akka*, *Scalatest*, and *scopt*.
 
 
+## Building in encrypted filesystems
+
+If your home directory is encrypted you might encounter a `java.io.IOException: File 
+name too long` exception. Some encrypted file systems, like encfs used by Ubuntu, do not allow
+long filenames, which is the cause of this error.
+
+The workaround is to add:
+
+~~~xml
+<args>
+    <arg>-Xmax-classfile-name</arg>
+    <arg>128</arg>
+</args>
+~~~
+
+in the compiler configuration of the `pom.xml` file of the module causing the error. For example,
+if the error appears in the `flink-yarn` module, the above code should 
+be added under the `<configuration>` tag of `scala-maven-plugin`. See 
+[this issue](https://issues.apache.org/jira/browse/FLINK-2003) for more information.
+
 ## Background
 
 The builds with Maven are controlled by [properties](http://maven.apache.org/pom.html#Properties) and <a href="http://maven.apache.org/guides/introduction/introduction-to-profiles.html">build profiles</a>.


[5/5] flink git commit: [FLINK-2619] [tests] Fix failing ExecutionGraphRestartTest and JobManagerRegistrationTest

Posted by se...@apache.org.
[FLINK-2619] [tests] Fix failing ExecutionGraphRestartTest and JobManagerRegistrationTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d035ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d035ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d035ab

Branch: refs/heads/master
Commit: 95d035ab363d13a53ec37894678c9a6a4896e9dd
Parents: c9edd9a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 16:00:25 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  31 +--
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +
 .../ExecutionGraphRestartTest.scala             |  34 ++--
 .../jobmanager/JobManagerRegistrationTest.scala | 187 +++++++++----------
 4 files changed, 138 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cde1741..a44fc82 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -857,20 +857,25 @@ public class ExecutionGraph implements Serializable {
 					else if (current == JobStatus.FAILING) {
 						if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
 							numberOfRetriesLeft--;
-							future(new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									try {
-										LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying);
-										Thread.sleep(delayBeforeRetrying);
+							
+							if (delayBeforeRetrying > 0) {
+								future(new Callable<Object>() {
+									@Override
+									public Object call() throws Exception {
+										try {
+											LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying);
+											Thread.sleep(delayBeforeRetrying);
+										}
+										catch(InterruptedException e){
+											// should only happen on shutdown
+										}
+										restart();
+										return null;
 									}
-									catch(InterruptedException e){
-										// should only happen on shutdown
-									}
-									restart();
-									return null;
-								}
-							}, executionContext);
+								}, executionContext);
+							} else {
+								restart();
+							}
 							break;
 						}
 						else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 64d4c44..ad30b9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -112,7 +112,9 @@ public class ExecutionGraphTestUtils {
 		return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
+	@SuppressWarnings("serial")
 	public static class SimpleActorGateway extends BaseTestingActorGateway {
+		
 		public TaskDeploymentDescriptor lastTDD;
 
 		public SimpleActorGateway(ExecutionContext executionContext){
@@ -139,7 +141,9 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 
+	@SuppressWarnings("serial")
 	public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
+
 		public SimpleFailingActorGateway(ExecutionContext executionContext) {
 			super(executionContext);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 434a8cb..d1b8fac 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -26,10 +26,13 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
 
@@ -39,7 +42,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
     "be manually restartable" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext))
+          new SimpleActorGateway(TestingUtils.directExecutionContext),
+            NUM_TASKS)
 
         val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
@@ -65,14 +69,18 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
+        
+        for (vertex <- eg.getAllExecutionVertices().asScala) {
+          vertex.getCurrentExecutionAttempt().cancelingComplete()
+        }
+        
         eg.getState should equal(JobStatus.FAILED)
 
         eg.restart()
         eg.getState should equal(JobStatus.RUNNING)
-
-        import collection.JavaConverters._
+        
         for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.executionFinished()
+          vertex.getCurrentExecutionAttempt().markFinished()
         }
 
         eg.getState should equal(JobStatus.FINISHED)
@@ -86,7 +94,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
     "restart itself automatically" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext))
+          new SimpleActorGateway(TestingUtils.directExecutionContext),
+          NUM_TASKS)
 
         val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
@@ -112,15 +121,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-
+        eg.getState should equal(JobStatus.FAILING)
+        
+        for (vertex <- eg.getAllExecutionVertices.asScala) {
+          vertex.getCurrentExecutionAttempt().cancelingComplete()
+        }
+        
         eg.getState should equal(JobStatus.RUNNING)
-
-        import collection.JavaConverters._
+        
         for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.executionFinished()
+          vertex.getCurrentExecutionAttempt().markFinished()
         }
 
-        eg.getState should equal(JobStatus.FINISHED)
+        eg.getState() should equal(JobStatus.FINISHED)
       } catch {
         case t: Throwable =>
           t.printStackTrace()
@@ -128,5 +141,4 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7487670..ea691f1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,18 +19,21 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.net.InetAddress
-import java.util.UUID
 
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance._
+import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
+
 import org.junit.Assert.{assertNotEquals, assertNotNull}
 import org.junit.runner.RunWith
+
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
@@ -56,123 +59,119 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "assign a TaskManager a unique instance ID" in {
       val jm = startTestingJobManager(_system)
-
-      val tmDummy1 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-      val tmDummy2 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
-        val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
-
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-        val leaderSessionID = UUID.randomUUID()
-
-        var id1: InstanceID = null
-        var id2: InstanceID = null
-
-        // task manager 1
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo1,
-              hardwareDescription,
-              1),
-            tmDummy1)
-
-          val response = receiveOne(1 second)
-          response match {
-            case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id1 = id
-            case _ => fail("Wrong response message: " + response)
-          }
+      
+      val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
+      val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
+      
+      val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
+      val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+
+      val hardwareDescription = HardwareDescription.extractFromSystem(10)
+      
+      var id1: InstanceID = null
+      var id2: InstanceID = null
+
+      // task manager 1
+      within(1 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo1,
+            hardwareDescription,
+            1),
+          new AkkaActorGateway(tm1, null))
+
+        val response = expectMsgType[LeaderSessionMessage]
+        response match {
+          case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id
+          case _ => fail("Wrong response message: " + response)
         }
+      }
 
-        // task manager 2
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo2,
-              hardwareDescription,
-              1),
-            tmDummy2)
-
-          val response = receiveOne(1 second)
-          response match {
-            case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id
-            case _ => fail("Wrong response message: " + response)
-          }
+      // task manager 2
+      within(1 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo2,
+            hardwareDescription,
+            1),
+          new AkkaActorGateway(tm2, null))
+
+        val response = expectMsgType[LeaderSessionMessage]
+        response match {
+          case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id
+          case _ => fail("Wrong response message: " + response)
         }
-
-        assertNotNull(id1)
-        assertNotNull(id2)
-        assertNotEquals(id1, id2)
-      }
-      finally {
-        tmDummy1 ! Kill
-        tmDummy2 ! Kill
-        jm ! Kill
       }
+
+      assertNotNull(id1)
+      assertNotNull(id2)
+      assertNotEquals(id1, id2)
     }
 
     "handle repeated registration calls" in {
 
       val jm = startTestingJobManager(_system)
-      val tmDummy = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          expectMsgType[AcknowledgeRegistration]
-          expectMsgType[AlreadyRegistered]
-          expectMsgType[AlreadyRegistered]
+      val selfGateway = new AkkaActorGateway(testActor, null)
+      
+      val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+      val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+      within(5 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) =>
+          case m => fail("Wrong message type: " + m)
+        }
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case m => fail("Wrong message type: " + m)
+        }
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case m => fail("Wrong message type: " + m)
         }
-      } finally {
-        tmDummy ! Kill
-        jm ! Kill
       }
     }
   }
 
-  private def startTestingJobManager(system: ActorSystem): ActorRef = {
+  private def startTestingJobManager(system: ActorSystem): ActorGateway = {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
       None,
       None,
       StreamingMode.BATCH_ONLY)
-    jm
+    new AkkaActorGateway(jm, null)
   }
 }
 
 object JobManagerRegistrationTest {
-
-  /** Simply dummy actor that swallows all messages */
-  class DummyActor extends Actor {
+  
+  class PlainForwardingActor(private val target: ActorRef) extends Actor {
     override def receive: Receive = {
-      case _ =>
+      case message => target.forward(message)
     }
   }
 }