You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:45 UTC

[49/82] [abbrv] incubator-flink git commit: Removed dead instance cleanup from InstanceManager so that Akka's watch mechanism is the current mean to detect dead instances.

Removed dead instance cleanup from InstanceManager so that Akka's watch mechanism is the current mean to detect dead instances.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8d414d7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8d414d7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8d414d7e

Branch: refs/heads/master
Commit: 8d414d7eb534bf5defa4aadd9b4f0a12a0be7ca8
Parents: 8eadd3e
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 17 18:33:45 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |   1 +
 .../flink/configuration/ConfigConstants.java    |  10 +-
 flink-runtime/pom.xml                           |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   3 +-
 .../apache/flink/runtime/instance/Instance.java |  27 +---
 .../flink/runtime/instance/InstanceManager.java |  85 +------------
 .../org/apache/flink/runtime/net/NetUtils.java  |   8 +-
 .../runtime/operators/RegularPactTask.java      |   1 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 125 ++++++++++++++-----
 .../flink/runtime/akka/KryoInitializer.scala    |  26 ++++
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  13 +-
 .../runtime/instance/InstanceManagerTest.java   |  79 ------------
 .../scheduler/SchedulerIsolatedTasksTest.java   |   6 -
 .../apache/flink/api/scala/ClosureCleaner.scala |   2 +-
 .../test/cancelling/CancellingTestBase.java     |   2 -
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../PartitionOperatorTranslationTest.scala      |   2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |   2 +-
 .../CoGroupCustomPartitioningTest.scala         |   2 +-
 .../CoGroupGroupSortTranslationTest.scala       |   2 +-
 ...tomPartitioningGroupingKeySelectorTest.scala |   2 +-
 .../CustomPartitioningGroupingPojoTest.scala    |   2 +-
 .../CustomPartitioningGroupingTupleTest.scala   |   2 +-
 .../translation/CustomPartitioningTest.scala    |   2 +-
 .../JoinCustomPartitioningTest.scala            |   2 +-
 .../translation/PartitioningTestClasses.scala   |  12 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   2 +-
 pom.xml                                         |   6 +
 30 files changed, 180 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 45848c2..c4e71ef 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -54,6 +54,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0ab180..ab1ed78 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -578,17 +578,17 @@ public final class ConfigConstants {
 	
 	// ------------------------------ Akka Values ------------------------------
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 ms";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "10 s";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s";
 
 	public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "1000 ms";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "10 s";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s";
 
-	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 10.0;
+	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0;
 
 	public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f04475c..b7edf7a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -123,6 +123,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>com.github.romix.akka</groupId>
+			<artifactId>akka-kryo-serialization_2.10</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_2.10</artifactId>
 		</dependency>
@@ -289,7 +294,6 @@ under the License.
 					<systemPropertyVariables>
 						<log.level>WARN</log.level>
 					</systemPropertyVariables>
-					<reuseForks>false</reuseForks>
 				</configuration>
 			</plugin>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8faa235..7d38eac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,7 +59,7 @@ import org.apache.flink.util.ExceptionUtils;
 import static akka.dispatch.Futures.future;
 
 
-public class ExecutionGraph {
+public class ExecutionGraph implements Serializable {
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 18d3212..aaa276d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -116,32 +116,7 @@ public class Instance {
 	public boolean isAlive() {
 		return !isDead;
 	}
-	
-	public void stopInstance() {
-		try {
-			final TaskOperationProtocol tmProxy = this.getTaskManagerProxy();
-			// start a thread for stopping the TM to avoid infinitive blocking.
-			Runnable r = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						tmProxy.killTaskManager();
-					} catch (IOException e) {
-						if (Log.isDebugEnabled()) {
-							Log.debug("Error while stopping TaskManager", e);
-						}
-					}
-				}
-			};
-			Thread t = new Thread(r);
-			t.setDaemon(true); // do not prevent the JVM from stopping
-			t.start();
-		} catch (Exception e) {
-			if (Log.isDebugEnabled()) {
-				Log.debug("Error while stopping TaskManager", e);
-			}
-		}
-	}
+
 	public void markDead() {
 		if (isDead) {
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 10c89e4..3ce3ac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -22,12 +22,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.ConfigConstants;
@@ -97,23 +94,11 @@ public class InstanceManager {
 		this.registeredHostsByConnection = new HashMap<ActorRef, Instance>();
 		this.deadHosts = new HashSet<ActorRef>();
 		this.heartbeatTimeout = heartbeatTimeout;
-
-		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
 	}
 	
 	public long getHeartbeatTimeout() {
 		return heartbeatTimeout;
 	}
-	
-	/**
-	 * This method is only used by the Flink YARN client to self-destruct a Flink cluster
-	 * by stopping the JVMs of the TaskManagers.
-	 */
-	public void killTaskManagers() {
-		for (Instance i : this.registeredHostsById.values()) {
-			i.stopInstance();
-		}
-	}
 
 	public void shutdown() {
 		synchronized (this.lock) {
@@ -122,8 +107,6 @@ public class InstanceManager {
 			}
 			this.shutdown = true;
 
-			this.cleanupStaleMachines.cancel();
-
 			for (Instance i : this.registeredHostsById.values()) {
 				i.markDead();
 			}
@@ -213,7 +196,7 @@ public class InstanceManager {
 
 		if(host != null){
 			registeredHostsByConnection.remove(taskManager);
-			registeredHostsById.remove(taskManager);
+			registeredHostsById.remove(host.getId());
 			deadHosts.add(taskManager);
 
 			host.markDead();
@@ -221,6 +204,10 @@ public class InstanceManager {
 			totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
 
 			notifyDeadInstance(host);
+
+			LOG.info("Unregistered task manager " + taskManager.path().address() + ". Number of " +
+					"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
+					" of available slots " + getTotalNumberOfSlots() + ".");
 		}
 	}
 
@@ -272,70 +259,10 @@ public class InstanceManager {
 			for (InstanceListener listener : this.instanceListeners) {
 				try {
 					listener.instanceDied(instance);
-				}
-				catch (Throwable t) {
+				} catch (Throwable t) {
 					LOG.error("Notification of dead instance failed.", t);
 				}
 			}
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void checkForDeadInstances() {
-		final long now = System.currentTimeMillis();
-		final long timeout = InstanceManager.this.heartbeatTimeout;
-		
-		synchronized (InstanceManager.this.lock) {
-			if (InstanceManager.this.shutdown) {
-				return;
-			}
-
-			final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-			
-			// check all hosts whether they did not send heart-beat messages.
-			while (entries.hasNext()) {
-				
-				final Map.Entry<InstanceID, Instance> entry = entries.next();
-				final Instance host = entry.getValue();
-				
-				if (!host.isStillAlive(now, timeout)) {
-					
-					// remove from the living
-					entries.remove();
-					registeredHostsByConnection.remove(host.getTaskManager());
-
-					// add to the dead
-					deadHosts.add(host.getTaskManager());
-					
-					host.markDead();
-					
-					totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
-					
-					LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
-							host.getId(), host.getPath(), heartbeatTimeout, registeredHostsById.size()));
-					
-					// report to all listeners
-					notifyDeadInstance(host);
-				}
-			}
-		}
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Periodic task that checks whether hosts have not sent their heart-beat
-	 * messages and purges the hosts in this case.
-	 */
-	private final TimerTask cleanupStaleMachines = new TimerTask() {
-		@Override
-		public void run() {
-			try {
-				checkForDeadInstances();
-			}
-			catch (Throwable t) {
-				LOG.error("Checking for dead instances failed.", t);
-			}
-		}
-	};
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 2cd929b..2795158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -367,6 +367,9 @@ public class NetUtils {
 							}
 							break;
 						case HEURISTIC:
+							LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
+									"isLinkLocalAddress:" + i.isLinkLocalAddress() +" " +
+									"isLoopbackAddress:" + i.isLoopbackAddress() + ".");
 							if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
 								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
 										"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -389,13 +392,16 @@ public class NetUtils {
 					break;
 				case SLOW_CONNECT:
 					if(!InetAddress.getLocalHost().isLoopbackAddress()){
+						LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
+								"IP address.");
 						return InetAddress.getLocalHost();
 					}else {
 						strategy = AddressDetectionState.HEURISTIC;
 						break;
 					}
 				case HEURISTIC:
-					throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
+					throw new RuntimeException("Unable to resolve own inet address by connecting " +
+							"to address (" + jobManagerAddress + ").");
 			}
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Defaulting to detection strategy " + strategy);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 5f520e3..9ea4a74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1303,7 +1303,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 					final TypeComparator<T> comparator = compFactory.createComparator();
 					oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
 				}
-
 				writers.add(new RecordWriter<SerializationDelegate<T>>(task, oe));
 			}
 			if (eventualOutputs != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f931497..168dccb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -78,42 +78,105 @@ object AkkaUtils {
     val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
       ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
 
-    val configString = s"""akka.remote.transport-failure-detector.heartbeat-interval =
-                       $transportHeartbeatInterval
-       |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $transportHeartbeatPause
-       |akka.remote.transport-failure-detector.threshold = $transportThreshold
-       |akka.remote.watch-failure-detector.heartbeat-interval = $watchHeartbeatInterval
-       |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $watchHeartbeatPause
-       |akka.remote.wathc-failure-detector.threshold = $watchThreshold
-       |akka.remote.netty.tcp.hostname = $host
-       |akka.remote.netty.tcp.port = $port
-       |akka.remote.netty.tcp.connection-timeout = $akkaTCPTimeout
-       |akka.remote.netty.tcp.maximum-frame-size = $akkaFramesize
-       |akka.actor.default-dispatcher.throughput = $akkaThroughput
-       |akka.remote.log-remote-lifecycle-events = $logLifecycleEvents
-       |akka.log-dead-letters = $logLifecycleEvents
-       |akka.log-dead-letters-during-shutdown = $logLifecycleEvents
-       |akka.loglevel = "$logLevel"
-       |akka.stdout-loglevel = "$logLevel"
-     """.stripMargin
+    val configString =
+      s"""
+         |akka {
+         |  loglevel = "$logLevel"
+         |  stdout-loglevel = "$logLevel"
+         |
+         |  log-dead-letters = $logLifecycleEvents
+         |  log-dead-letters-during-shutdown = $logLifecycleEvents
+         |
+         |  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"]
+         |
+         |  remote {
+         |    transport-failure-detector{
+         |      acceptable-heartbeat-pause = $transportHeartbeatPause
+         |      threshold = $transportThreshold
+         |      heartbeat-interval = $transportHeartbeatInterval
+         |    }
+         |
+         |    watch-failure-detector{
+         |      heartbeat-interval = $watchHeartbeatInterval
+         |      acceptable-heartbeat-pause = $watchHeartbeatPause
+         |      threshold = $watchThreshold
+         |    }
+         |
+         |    netty{
+         |      tcp{
+         |        hostname = $host
+         |        port = $port
+         |        connection-timeout = $akkaTCPTimeout
+         |        maximum-frame-size = $akkaFramesize
+         |      }
+         |    }
+         |
+         |    log-remote-lifecycle-events = $logLifecycleEvents
+         |
+         |  }
+         |
+         |  actor{
+         |    default-dispatcher{
+         |      throughput = $akkaThroughput
+         |    }
+         |
+         |    kryo{
+         |      type = "nograph"
+         |      idstrategy = "default"
+         |      serializer-pool-size = 16
+         |      buffer-size = 4096
+         |      max-buffer-size = -1
+         |      use-manifests = false
+         |      compression = off
+         |      implicit-registration-logging = true
+         |      kryo-trace = true
+         |      kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
+         |    }
+         |
+         |    serialize-messages = on
+         |
+         |    serializers{
+         |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
+         |    }
+         |
+         |    serialization-bindings {
+         |    }
+         |  }
+         |}
+       """.stripMargin
 
     getDefaultActorSystemConfigString + configString
   }
 
   def getDefaultActorSystemConfigString: String = {
-    s"""akka.daemonic = on
-      |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |akka.loglevel = "WARNING"
-      |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-      |akka.stdout-loglevel = "WARNING"
-      |akka.jvm-exit-on-fatal-error = off
-      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
-      |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.log-config-on-start = off
-      |akka.remote.netty.tcp.port = 0
-      |akka.remote.netty.tcp.maximum-frame-size = 1MB
-    """.stripMargin
+    s"""
+       |akka {
+       |  daemonic = on
+       |
+       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
+       |  loglevel = "WARNING"
+       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+       |  stdout-loglevel = "WARNING"
+       |  jvm-exit-on-fatal-error = off
+       |  log-config-on-start = off
+       |
+       |  actor {
+       |    provider = "akka.remote.RemoteActorRefProvider"
+       |  }
+       |
+       |  remote{
+       |    netty{
+       |      tcp{
+       |        transport-class = "akka.remote.transport.netty.NettyTransport"
+       |        tcp-nodelay = on
+       |
+       |        port = 0
+       |        maximum-frame-size = 1MB
+       |      }
+       |    }
+       |  }
+       |}
+     """.stripMargin
   }
 
   def getDefaultActorSystemConfig = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
new file mode 100644
index 0000000..5f9854b
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import com.esotericsoftware.kryo.Kryo
+
+class KryoInitializer {
+  def cystomize(kryo: Kryo): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a72c685..a18240e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
         hardwareInformation, numberOfSlots)
 
       // to be notified when the taskManager is no longer reachable
-      context.watch(taskManager);
+//      context.watch(taskManager);
 
       taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
     }
@@ -381,7 +381,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case Terminated(taskManager) => {
       log.info(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
-      context.unwatch(taskManager)
+//      context.unwatch(taskManager)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a145689..261d50a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -79,7 +79,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   val REGISTRATION_DELAY = 0 seconds
   val REGISTRATION_INTERVAL = 10 seconds
   val MAX_REGISTRATION_ATTEMPTS = 10
-  val HEARTBEAT_INTERVAL = 1000 millisecond
+  val HEARTBEAT_INTERVAL = 5000 millisecond
 
   TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
@@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
-        context.watch(currentJobManager)
+//        context.watch(currentJobManager)
 
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
@@ -232,9 +232,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       val taskIndex = tdd.getIndexInSubtaskGroup
       val numSubtasks = tdd.getCurrentNumberOfSubtasks
       var jarsRegistered = false
+      var startRegisteringTask = 0L
 
       try {
+        if(log.isDebugEnabled){
+          startRegisteringTask = System.currentTimeMillis()
+        }
         libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
+
+        if(log.isDebugEnabled){
+          log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
+            startRegisteringTask)/1000.0}s")
+        }
         jarsRegistered = true
 
         val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 1f63588..8a89503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -235,83 +235,4 @@ public class InstanceManagerTest{
 			Assert.fail("Test erroneous: " + e.getMessage());
 		}
 	}
-
-	/**
-	 * This test checks the clean-up routines of the cluster manager.
-	 */
-	@Test
-	public void testCleanUp() {
-		try {
-			InstanceManager cm = new InstanceManager(200, 100);
-
-			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 20000);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 20001);
-
-			JavaTestKit probe1 = new JavaTestKit(system);
-			JavaTestKit probe2 = new JavaTestKit(system);
-			// register two instances
-			InstanceID i1 = cm.registerTaskManager(probe1.getRef(), ici1, resources, 1);
-			InstanceID i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1);
-
-			assertNotNull(i1);
-			assertNotNull(i2);
-			
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both only one machine
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-			}
-			
-			// we should have lost one TM by now
-			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(1, cm.getTotalNumberOfSlots());
-			
-			// if the lost TM reports, it should not be accepted
-			assertFalse(cm.reportHeartBeat(i2));
-			
-			// allow the lost TM to re-register itself
-			i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1);
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-			
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 240cdac..9418d77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -38,14 +38,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 
 /**
  * Tests for the {@link Scheduler} when scheduling individual tasks.
@@ -284,9 +281,6 @@ public class SchedulerIsolatedTasksTest {
 			// the slots should all be different
 			assertTrue(areAllDistinct(slotsAfter.toArray()));
 			
-			executor.shutdown();
-			executor.awaitTermination(30, TimeUnit.SECONDS);
-			
 			assertEquals(totalSlots, scheduler.getNumberOfAvailableSlots());
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index a3c564a..9740c82 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 303ee3d..63ca29d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -121,8 +121,6 @@ public abstract class CancellingTestBase {
 			actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
 							TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
 					actorSystem.dispatcher(), ActorRef.noSender());
-						case RESTARTING:
-							throw new IllegalStateException("Job restarted");
 
 			try {
 				Await.result(result, AkkaUtils.DEFAULT_TIMEOUT());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 0b686e5..2c2d4ff 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
 ################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+log4j.rootLogger=DEBUG, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 1e44413..a83d728 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index a063957..0d6b763 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index d09fe60..a34c7d8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 1c6afba..bd254fe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 7304310..9535173 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 17ecc3f..93e3593 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index 8ffba8e..04a6285 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index b5f266f..e5b6c5f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index d4e438f..1dcf181 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 8cb49b8..d83d3be 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
index bcf1869..2a25be4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.flink.api.common.functions.Partitioner
-import org.apache.flink.api.scala._
-import org.apache.flink.test.compiler.util.CompilerTestBase
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType
-import org.apache.flink.compiler.plan.SingleInputPlanNode
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.compiler.plan.DualInputPlanNode
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index 24dbfe5..d150e85 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58bf3db..6739948 100644
--- a/pom.xml
+++ b/pom.xml
@@ -282,6 +282,12 @@ under the License.
 			</dependency>
 
 			<dependency>
+				<groupId>com.github.romix.akka</groupId>
+				<artifactId>akka-kryo-serialization_2.10</artifactId>
+				<version>0.3.2</version>
+			</dependency>
+
+			<dependency>
 				<groupId>org.scalatest</groupId>
 				<artifactId>scalatest_2.10</artifactId>
 				<version>2.2.2</version>