You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 20:45:47 UTC

[1/4] flink git commit: [FLINK-2543] [core] Make exception communication and result/failure notifications consistent with respect to serialization of exceptions

Repository: flink
Updated Branches:
  refs/heads/master 554b77bcd -> 0ba53558f


http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 763885c..c2ab424 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +67,7 @@ public class ClusterUtil {
 				exec.submitJobDetached(jobGraph);
 				return null;
 			} else {
-				SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution);
-				return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
+				return exec.submitJobAndWait(jobGraph, printDuringExecution);
 			}
 		} finally {
 			if (exec != null && !detached) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index ae31d95..ccaa486 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -20,10 +20,10 @@ package org.apache.flink.test.accumulators;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -55,11 +55,14 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -190,7 +193,7 @@ public class AccumulatorLiveITCase {
 			// submit job
 
 			jobManagerGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, false), selfGateway);
-			expectMsgClass(TIMEOUT, Status.Success.class);
+			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
 
 
 			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index e43a9e4..5c2f2dc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -26,11 +26,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class ClassLoaderITCase {
 
 	private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
@@ -47,7 +49,6 @@ public class ClassLoaderITCase {
 	@Test
 	public void testJobsWithCustomClassLoader() {
 		try {
-
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
@@ -62,37 +63,45 @@ public class ClassLoaderITCase {
 			try {
 				int port = testCluster.getJobManagerRPCPort();
 
-				PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
+				PackagedProgram inputSplitTestProg = new PackagedProgram(
+						new File(INPUT_SPLITS_PROG_JAR_FILE),
 						new String[] { INPUT_SPLITS_PROG_JAR_FILE,
 										"localhost",
 										String.valueOf(port),
 										"4" // parallelism
-									} );
+									});
 				inputSplitTestProg.invokeInteractiveModeForExecution();
 
 				// regular streaming job
-				PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE),
-						new String[] { STREAMING_PROG_JAR_FILE, "localhost", String.valueOf(port) } );
+				PackagedProgram streamingProg = new PackagedProgram(
+						new File(STREAMING_PROG_JAR_FILE),
+						new String[] { 
+								STREAMING_PROG_JAR_FILE,
+								"localhost",
+								String.valueOf(port)
+						});
 				streamingProg.invokeInteractiveModeForExecution();
 
 				// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
 				// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
 				try {
-					PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
-							new String[]{STREAMING_CHECKPOINTED_PROG_JAR_FILE, "localhost", String.valueOf(port)});
+					PackagedProgram streamingCheckpointedProg = new PackagedProgram(
+							new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
+							new String[] {
+									STREAMING_CHECKPOINTED_PROG_JAR_FILE, 
+									"localhost",
+									String.valueOf(port)});
 					streamingCheckpointedProg.invokeInteractiveModeForExecution();
-				} catch(Exception e) {
+				}
+				catch (Exception e) {
 					// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
-					try {
-						if (!(e.getCause().getCause().getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"))) {
-							throw e;
-						}
-					} catch(Throwable ignore) {
-						throw e;
-					}
+					assertEquals("Program should terminate with a 'SuccessException'",
+							"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException",
+							e.getCause().getCause().getClass().getCanonicalName());
 				}
 
-				PackagedProgram kMeansProg = new PackagedProgram(new File(KMEANS_JAR_PATH),
+				PackagedProgram kMeansProg = new PackagedProgram(
+						new File(KMEANS_JAR_PATH),
 						new String[] { KMEANS_JAR_PATH,
 										"localhost",
 										String.valueOf(port),
@@ -100,7 +109,7 @@ public class ClassLoaderITCase {
 										KMeansData.DATAPOINTS,
 										KMeansData.INITIAL_CENTERS,
 										"25"
-									} );
+									});
 				kMeansProg.invokeInteractiveModeForExecution();
 			}
 			finally {
@@ -109,8 +118,7 @@ public class ClassLoaderITCase {
 		}
 		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index feb90b2..577b5c6 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,9 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.Status.Success
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+
 import org.junit.runner.RunWith
+
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
@@ -57,7 +59,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val cluster = startDeathwatchCluster(num_slots, 1)
 
       val tm = cluster.getTaskManagers(0)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       // disable disconnect message to test death watch
       tm ! DisableDisconnect
@@ -76,7 +78,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
         cluster.waitForTaskManagersToBeRegistered()
 
-        cluster.getJobManagerGateway.tell(RequestNumberRegisteredTaskManager, self)
+        cluster.getJobManagerGateway().tell(RequestNumberRegisteredTaskManager, self)
 
         expectMsg(1)
       } finally {
@@ -99,13 +101,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
       val cluster = startDeathwatchCluster(num_slots / 2, 2)
 
-      var jmGateway = cluster.getJobManagerGateway
+      var jmGateway = cluster.getJobManagerGateway()
       val tm = cluster.getTaskManagers(0)
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           tm.tell(NotifyWhenJobManagerTerminated(jmGateway.actor()), self)
 
@@ -115,13 +117,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
           cluster.restartJobManager()
 
-          jmGateway = cluster.getJobManagerGateway
+          jmGateway = cluster.getJobManagerGateway()
 
           cluster.waitForTaskManagersToBeRegistered()
 
           jmGateway.tell(SubmitJob(jobGraph2, false), self)
 
-          val failure = expectMsgType[Success]
+          expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
 
           val result = expectMsgType[JobResultSuccess]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 1952760..a37fae3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, BlockingNoOpInvokable, BlockingReceiver, Sender}
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, RequestNumberRegisteredTaskManager, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobManager, NotifyWhenRegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
@@ -63,7 +63,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val cluster = startDeathwatchCluster(num_slots, 2)
 
       val taskManagers = cluster.getTaskManagers
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       jmGateway.tell(DisableDisconnect)
 
@@ -103,12 +103,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
 
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
 
@@ -126,8 +126,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
             case None => fail("Could not retrieve a working task manager.")
           }
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -156,12 +156,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
 
       val taskManagers = cluster.getTaskManagers
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
           expectMsg(AllVerticesRunning(jobID))
@@ -169,8 +169,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
           // kill one task manager
           taskManagers(0) ! Kill
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -199,17 +199,17 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val cluster = startDeathwatchCluster(num_slots/2, 2)
 
       var tm = cluster.getTaskManagers(0)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try{
         within(TestingUtils.TESTING_DURATION){
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           tm ! PoisonPill
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -227,7 +227,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
           jmGateway.tell(SubmitJob(jobGraph2, false), self)
 
-          expectMsgType[Success]
+          expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
 
           val result = expectMsgType[JobResultSuccess]
           result.result.getJobId() should equal(jobGraph2.getJobID)


[2/4] flink git commit: [FLINK-2543] [core] Make exception communication and result/failure notifications consistent with respect to serialization of exceptions

Posted by se...@apache.org.
[FLINK-2543] [core] Make exception communication and result/failure notifications consistent with respect to serialization of exceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ba53558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ba53558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ba53558

Branch: refs/heads/master
Commit: 0ba53558f9b56b1e17c84ab8e4ee639ca09b9133
Parents: bf8c8e5
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 28 21:05:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 30 18:21:39 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |   4 +-
 .../apache/flink/client/program/ClientTest.java |  18 +-
 .../flink/runtime/akka/FlinkUntypedActor.java   |   3 +-
 .../client/JobCancellationException.java        |   2 +-
 .../apache/flink/runtime/client/JobClient.java  | 156 +++++++++++-------
 .../flink/runtime/client/JobClientActor.java    |  30 ++--
 .../runtime/client/JobExecutionException.java   |  11 +-
 .../flink/runtime/executiongraph/Execution.java |   5 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  23 +--
 .../runtime/taskmanager/TaskExecutionState.java |  23 ++-
 .../flink/runtime/util/SerializedThrowable.java | 163 +++++++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  68 ++++----
 .../messages/ExecutionGraphMessages.scala       |  20 ++-
 .../runtime/messages/JobManagerMessages.scala   |  14 ++
 .../runtime/minicluster/FlinkMiniCluster.scala  |  16 +-
 .../runtime/jobmanager/JobManagerTest.java      |   8 +-
 .../runtime/taskmanager/TaskCancelTest.java     |   3 +-
 .../taskmanager/TaskExecutionStateTest.java     |   3 +-
 .../runtime/util/SerializedThrowableTest.java   | 140 ++++++++++++++++
 .../jobmanager/CoLocationConstraintITCase.scala |   6 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  97 ++++++-----
 .../runtime/jobmanager/RecoveryITCase.scala     |  15 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |  10 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  19 +--
 .../flink/streaming/util/ClusterUtil.java       |   5 +-
 .../accumulators/AccumulatorLiveITCase.java     |   7 +-
 .../test/classloading/ClassLoaderITCase.java    |  48 +++---
 .../jobmanager/JobManagerFailsITCase.scala      |  14 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  30 ++--
 29 files changed, 622 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 2f5d888..e90a39c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -425,10 +425,10 @@ public class Client {
 		try{
 			if (wait) {
 				return JobClient.submitJobAndWait(actorSystem,
-						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, this.userCodeClassLoader);
+						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
 			}
 			else {
-				JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout);
+				JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
 				// return a dummy execution result with the JobId
 				return new JobSubmissionResult(jobGraph.getJobID());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 594525e..46de93d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
+
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -39,15 +40,18 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.NetUtils;
-import org.junit.After;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -225,16 +229,17 @@ public class ClientTest {
 
 	public static class SuccessReturningActor extends FlinkUntypedActor {
 
-		private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+		private final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
 
 		@Override
 		public void handleMessage(Object message) {
 			if (message instanceof JobManagerMessages.SubmitJob) {
 				JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
 				getSender().tell(
-						decorateMessage(new Status.Success(jid)),
+						decorateMessage(new JobManagerMessages.JobSubmitSuccess(jid)),
 						getSelf());
-			} else if(message instanceof JobManagerMessages.RequestLeaderSessionID$) {
+			}
+			else if (message.getClass() == JobManagerMessages.getRequestLeaderSessionID().getClass()) {
 				getSender().tell(
 						decorateMessage(new JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)),
 						getSelf());
@@ -254,12 +259,13 @@ public class ClientTest {
 
 	public static class FailureReturningActor extends FlinkUntypedActor {
 
-		private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+		private Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
 
 		@Override
 		public void handleMessage(Object message) {
 			getSender().tell(
-					decorateMessage(new Status.Failure(new Exception("test"))),
+					decorateMessage(new JobManagerMessages.JobResultFailure(
+							new SerializedThrowable(new Exception("test")))),
 					getSelf());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 1456758..0623862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -39,7 +39,8 @@ import java.util.UUID;
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
-	protected Logger LOG = LoggerFactory.getLogger(getClass());
+	
+	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	/**
 	 * This method is called by Akka if a new message has arrived for the actor. It logs the

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
index 842870d..559d805 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
@@ -28,7 +28,7 @@ public class JobCancellationException extends JobExecutionException {
 
 	private static final long serialVersionUID = 2818087325120827526L;
 
-	public JobCancellationException(final JobID jobID, final String msg, final Throwable cause){
+	public JobCancellationException(JobID jobID, String msg, Throwable cause){
 		super(jobID, msg, cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 44d2c00..7507643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -25,7 +25,7 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Preconditions;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-
 import org.apache.flink.runtime.util.SerializedThrowable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +53,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.TimeoutException;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The JobClient bridges between the JobManager's asynchronous actor messages and
  * the synchronous method calls to trigger.
@@ -87,7 +89,6 @@ public class JobClient {
 	 * @return The socket address of the JobManager actor system
 	 */
 	public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
-
 		String jobManagerAddress = config.getString(
 				ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 
@@ -131,10 +132,10 @@ public class JobClient {
 			boolean sysoutLogUpdates,
 			ClassLoader userCodeClassloader) throws JobExecutionException {
 
-		Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
-		Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
-		Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
-		Preconditions.checkNotNull(timeout, "The timeout must not be null.");
+		checkNotNull(actorSystem, "The actorSystem must not be null.");
+		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+		checkNotNull(jobGraph, "The jobGraph must not be null.");
+		checkNotNull(timeout, "The timeout must not be null.");
 
 		// for this job, we create a proxy JobClientActor that deals with all communication with
 		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
@@ -148,35 +149,15 @@ public class JobClient {
 				jobManagerGateway.leaderSessionID());
 
 		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
-
+		
+		// first block handles errors while waiting for the result
+		Object answer;
 		try {
 			Future<Object> future = Patterns.ask(jobClientActor,
 					new JobClientMessages.SubmitJobAndWait(jobGraph),
 					new Timeout(AkkaUtils.INF_TIMEOUT()));
-
-			Object answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
-
-			if (answer instanceof JobManagerMessages.JobResultSuccess) {
-				LOG.info("Job execution complete");
-
-				SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
-				if (result != null) {
-					return result.toJobExecutionResult(userCodeClassloader);
-				} else {
-					throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
-				}
-			} else {
-				throw new Exception("Unknown answer after submitting the job: " + answer);
-			}
-		}
-		catch (JobExecutionException e) {
-			if(e.getCause() instanceof SerializedThrowable) {
-				SerializedThrowable serializedThrowable = (SerializedThrowable)e.getCause();
-				Throwable deserialized = serializedThrowable.deserializeError(userCodeClassloader);
-				throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed " + deserialized.getMessage(), deserialized);
-			} else {
-				throw e;
-			}
+			
+			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
 		}
 		catch (TimeoutException e) {
 			throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " +
@@ -190,6 +171,48 @@ public class JobClient {
 			// failsafe shutdown of the client actor
 			jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
 		}
+
+		// second block handles the actual response
+		if (answer instanceof JobManagerMessages.JobResultSuccess) {
+			LOG.info("Job execution complete");
+
+			SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
+			if (result != null) {
+				try {
+					return result.toJobExecutionResult(userCodeClassloader);
+				}
+				catch (Throwable t) {
+					throw new JobExecutionException(jobGraph.getJobID(),
+							"Job was successfully executed but JobExecutionResult could not be deserialized.");
+				}
+			}
+			else {
+				throw new JobExecutionException(jobGraph.getJobID(),
+						"Job was successfully executed but result contained a null JobExecutionResult.");
+			}
+		}
+		if (answer instanceof JobManagerMessages.JobResultFailure) {
+			LOG.info("Job execution failed");
+
+			SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
+			if (serThrowable != null) {
+				Throwable cause = serThrowable.deserializeError(userCodeClassloader);
+				if (cause instanceof JobExecutionException) {
+					throw (JobExecutionException) cause;
+				}
+				else {
+					throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
+				}
+			}
+			else {
+				throw new JobExecutionException(jobGraph.getJobID(),
+						"Job execution failed with null as failure cause.");
+			}
+		}
+		else {
+			throw new JobExecutionException(jobGraph.getJobID(),
+					"Unknown answer from JobManager after submitting the job: " + answer);
+		}
 	}
 
 	/**
@@ -203,31 +226,20 @@ public class JobClient {
 	public static void submitJobDetached(
 			ActorGateway jobManagerGateway,
 			JobGraph jobGraph,
-			FiniteDuration timeout) throws JobExecutionException {
-
-		Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
-		Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
-		Preconditions.checkNotNull(timeout, "The timeout must not be null.");
-
-		Future<Object> future = jobManagerGateway.ask(
-				new JobManagerMessages.SubmitJob(jobGraph, false),
-				timeout);
+			FiniteDuration timeout,
+			ClassLoader userCodeClassloader) throws JobExecutionException {
 
+		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+		checkNotNull(jobGraph, "The jobGraph must not be null.");
+		checkNotNull(timeout, "The timeout must not be null.");
+		
+		Object result;
 		try {
-			Object result = Await.result(future, timeout);
-			if (result instanceof JobID) {
-				JobID respondedID = (JobID) result;
-				if (!respondedID.equals(jobGraph.getJobID())) {
-					throw new Exception("JobManager responded for wrong Job. This Job: " +
-							jobGraph.getJobID() + ", response: " + respondedID);
-				}
-			}
-			else {
-				throw new Exception("Unexpected response: " + result);
-			}
-		}
-		catch (JobExecutionException e) {
-			throw e;
+			Future<Object> future = jobManagerGateway.ask(
+					new JobManagerMessages.SubmitJob(jobGraph, false),
+					timeout);
+			
+			result = Await.result(future, timeout);
 		}
 		catch (TimeoutException e) {
 			throw new JobTimeoutException(jobGraph.getJobID(),
@@ -237,6 +249,33 @@ public class JobClient {
 			throw new JobExecutionException(jobGraph.getJobID(),
 					"Failed to send job to JobManager: " + t.getMessage(), t.getCause());
 		}
+		
+		if (result instanceof JobManagerMessages.JobSubmitSuccess) {
+			JobID respondedID = ((JobManagerMessages.JobSubmitSuccess) result).jobId();
+			
+			// validate response
+			if (!respondedID.equals(jobGraph.getJobID())) {
+				throw new JobExecutionException(jobGraph.getJobID(),
+						"JobManager responded for wrong Job. This Job: " +
+						jobGraph.getJobID() + ", response: " + respondedID);
+			}
+		}
+		else if (result instanceof JobManagerMessages.JobResultFailure) {
+			try {
+				SerializedThrowable t = ((JobManagerMessages.JobResultFailure) result).cause();
+				throw t.deserializeError(userCodeClassloader);
+			}
+			catch (JobExecutionException e) {
+				throw e;
+			}
+			catch (Throwable t) {
+				throw new JobExecutionException(jobGraph.getJobID(),
+						"JobSubmission failed: " + t.getMessage(), t);
+			}
+		}
+		else {
+			throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
+		}
 	}
 
 	/**
@@ -249,13 +288,10 @@ public class JobClient {
 	 * @param timeout    Timeout for futures
 	 * @throws IOException Thrown, if the file upload to the JobManager failed.
 	 */
-	public static void uploadJarFiles(
-			JobGraph jobGraph,
-			ActorGateway jobManagerGateway,
-			FiniteDuration timeout)
+	public static void uploadJarFiles(JobGraph jobGraph, ActorGateway jobManagerGateway, FiniteDuration timeout)
 			throws IOException {
+		
 		if (jobGraph.hasUsercodeJarFiles()) {
-
 			Future<Object> futureBlobPort = jobManagerGateway.ask(
 					JobManagerMessages.getRequestBlobManagerPort(),
 					timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index b4bfc8f..16c6baf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -43,17 +43,15 @@ public class JobClientActor extends FlinkUntypedActor {
 	private final Logger logger;
 	private final boolean sysoutUpdates;
 
-	// leader session ID of the JobManager when this actor was created
+	/** leader session ID of the JobManager when this actor was created */
 	private final Option<UUID> leaderSessionID;
 
-	// Actor which submits a job to the JobManager via this actor
+	/** Actor which submits a job to the JobManager via this actor */
 	private ActorRef submitter;
 
-	public JobClientActor(
-			ActorRef jobManager,
-			Logger logger,
-			boolean sysoutUpdates,
-			Option<UUID> leaderSessionID) {
+	public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates,
+							Option<UUID> leaderSessionID) {
+		
 		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
 		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
 		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
@@ -113,9 +111,14 @@ public class JobClientActor extends FlinkUntypedActor {
 		}
 		// acknowledgement to submit job is only logged, our original
 		// submitter is only interested in the final job result
-		else if (message instanceof JobManagerMessages.JobResultSuccess) {
+		else if (message instanceof JobManagerMessages.JobResultSuccess ||
+				message instanceof JobManagerMessages.JobResultFailure) {
+			
+			if (logger.isDebugEnabled()) {
+				logger.debug("Received {} message from JobManager", message.getClass().getSimpleName());
+			}
+
 			// forward the success to the original job submitter
-			logger.debug("Received JobResultSuccess message from JobManager");
 			if (this.submitter != null) {
 				this.submitter.tell(decorateMessage(message), getSelf());
 			}
@@ -124,17 +127,10 @@ public class JobClientActor extends FlinkUntypedActor {
 			getContext().unwatch(jobManager);
 			getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
 		}
-		else if (message instanceof Status.Success) {
+		else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
 			// job was successfully submitted :-)
 			logger.info("Job was successfully submitted to the JobManager");
 		}
-		else if (message instanceof Status.Failure) {
-			// job execution failed, inform the actor that submitted the job
-			logger.debug("Received failure from JobManager", ((Status.Failure) message).cause());
-			if (submitter != null) {
-				submitter.tell(decorateMessage(message), sender());
-			}
-		}
 
 		// =========== Actor / Communication Failure ===============
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 56ccef5..7871a8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -29,7 +29,7 @@ public class JobExecutionException extends Exception {
 
 	private static final long serialVersionUID = 2818087325120827525L;
 
-	private JobID jobID;
+	private final JobID jobID;
 
 	/**
 	 * Constructs a new job execution exception.
@@ -37,21 +37,18 @@ public class JobExecutionException extends Exception {
 	 * @param msg The cause for the execution exception.
 	 * @param cause The cause of the exception
 	 */
-	public JobExecutionException(final JobID jobID, final String msg, final Throwable cause) {
+	public JobExecutionException(JobID jobID, String msg, Throwable cause) {
 		super(msg, cause);
-
 		this.jobID = jobID;
 	}
 
-	public JobExecutionException(final JobID jobID, final String msg) {
+	public JobExecutionException(JobID jobID, String msg) {
 		super(msg);
-
 		this.jobID = jobID;
 	}
 
-	public JobExecutionException(final JobID jobID, final Throwable cause) {
+	public JobExecutionException(JobID jobID, Throwable cause) {
 		super(cause);
-
 		this.jobID = jobID;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c7191fa..189682b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,9 +46,12 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -137,7 +140,7 @@ public class Execution implements Serializable {
 	private ExecutionContext executionContext;
 
 	/* Lock for updating the accumulators atomically. */
-	private final Object accumulatorLock = new Object();
+	private final SerializableObject accumulatorLock = new SerializableObject();
 
 	/* Continuously updated map of user-defined accumulators */
 	private volatile Map<String, Accumulator<?, ?>> userAccumulators;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9648a8f..3602372 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -45,8 +46,8 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -908,9 +909,11 @@ public class ExecutionGraph implements Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Updates the state of the Task and sets the final accumulator results.
-	 * @param state
-	 * @return
+	 * Updates the state of one of the ExecutionVertex's Execution attempts.
+	 * If the new status if "FINISHED", this also updates the 
+	 * 
+	 * @param state The state update.
+	 * @return True, if the task update was properly applied, false, if the execution attempt was not found.
 	 */
 	public boolean updateState(TaskExecutionState state) {
 		Execution attempt = this.currentExecutions.get(state.getID());
@@ -926,8 +929,9 @@ public class ExecutionGraph implements Serializable {
 						AccumulatorSnapshot accumulators = state.getAccumulators();
 						flinkAccumulators = accumulators.deserializeFlinkAccumulators();
 						userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
-					} catch (Exception e) {
-						// Exceptions would be thrown in the future here
+					}
+					catch (Exception e) {
+						// we do not fail the job on deserialization problems of accumulators, but only log
 						LOG.error("Failed to deserialize final accumulator results.", e);
 					}
 
@@ -1029,12 +1033,9 @@ public class ExecutionGraph implements Serializable {
 	
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if (jobStatusListenerActors.size() > 0) {
-			SerializedThrowable serializedThrowable = null;
-			if(error != null) {
-				serializedThrowable = new SerializedThrowable(error);
-			}
 			ExecutionGraphMessages.JobStatusChanged message =
-					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), serializedThrowable);
+					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
+							error == null ? null : new SerializedThrowable(error));
 
 			for (ActorGateway listener: jobStatusListenerActors) {
 				listener.tell(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 07b7ee8..c7ea06e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -104,7 +104,7 @@ public class TaskExecutionState implements java.io.Serializable {
 		this.jobID = jobID;
 		this.executionId = executionId;
 		this.executionState = executionState;
-		if(error != null) {
+		if (error != null) {
 			this.throwable = new SerializedThrowable(error);
 		} else {
 			this.throwable = null;
@@ -115,18 +115,18 @@ public class TaskExecutionState implements java.io.Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Gets the attached exception. Requires to pass a classloader, because the
-	 * class of the exception may be user-defined and hence only accessible through
-	 * the user code classloader, not the default classloader.
-	 *
-	 * @param usercodeClassloader The class loader for the user code of the
-	 *                            job this update refers to.
+	 * Gets the attached exception, which is in serialized form. Returns null,
+	 * if the status update is no failure with an associated exception.
+	 * 
+	 * @param userCodeClassloader The classloader that can resolve user-defined exceptions.
+	 * @return The attached exception, or null, if none.
 	 */
-	public Throwable getError(ClassLoader usercodeClassloader) {
+	public Throwable getError(ClassLoader userCodeClassloader) {
 		if (this.throwable == null) {
 			return null;
-		} else {
-			return throwable.deserializeError(usercodeClassloader);
+		}
+		else {
+			return this.throwable.deserializeError(userCodeClassloader);
 		}
 	}
 
@@ -173,8 +173,7 @@ public class TaskExecutionState implements java.io.Serializable {
 			return other.jobID.equals(this.jobID) &&
 					other.executionId.equals(this.executionId) &&
 					other.executionState == this.executionState &&
-					(other.throwable == null ? this.throwable == null :
-						(this.throwable != null && throwable.equals(other.throwable) ));
+					(other.throwable == null) == (this.throwable == null);
 		}
 		else {
 			return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index 6e5a558..59178c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -15,101 +15,164 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.util;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.io.Serializable;
-import java.util.Arrays;
+import java.lang.ref.WeakReference;
 
 /**
- * Utility class for dealing with serialized Throwables.
- * Needed to send around user-specific exception classes with Akka.
+ * Utility class for dealing with user-defined Throwable types that are serialized (for
+ * example during RPC/Actor communication), but cannot be resolved with the default
+ * class loader.
+ * <p>
+ * This exception mimics the original exception with respect to message and stack trace,
+ * and contains the original exception in serialized form. The original exception
+ * can be re-obtained by supplying the appropriate class loader.
  */
 public class SerializedThrowable extends Exception implements Serializable {
+	
 	private static final long serialVersionUID = 7284183123441947635L;
-	private final byte[] serializedError;
+	
+	/** The original exception in serialized form */
+	private final byte[] serializedException;
+	
+	/** Name of the original error class */
+	private final String originalErrorClassName;
+	
+	/** The original stack trace, to be printed */
+	private final String fullStingifiedStackTrace;
 
-	// The exception must not be (de)serialized with the class, as its
-	// class may not be part of the system class loader.
-	private transient Throwable cachedError;
+	/** A guaranteed serializable placeholder exception that will be used as
+	 * cause and to capture the original stack trace */
+	private final Exception placeholder;
+	
+	/** The original exception, not transported via serialization, 
+	 * because the class may not be part of the system class loader.
+	 * In addition, we make sure our cached references to not prevent
+	 * unloading the exception class. */
+	private transient WeakReference<Throwable> cachedException;
 
 
 	/**
 	 * Create a new SerializedThrowable.
-	 * @param error The exception to serialize.
+	 * 
+	 * @param exception The exception to serialize.
 	 */
-	public SerializedThrowable(Throwable error) {
-		Preconditions.checkNotNull(error, "The exception to serialize has to be set");
-		this.cachedError = error;
-		byte[] serializedError;
-		try {
-			serializedError = InstantiationUtil.serializeObject(error);
-		}
-		catch (Throwable t) {
-			// could not serialize exception. send the stringified version instead
+	public SerializedThrowable(Throwable exception) {
+		super(getMessageOrError(exception));
+
+		if (!(exception instanceof SerializedThrowable)) {
+			this.cachedException = new WeakReference<Throwable>(exception);
+			
+			this.originalErrorClassName = exception.getClass().getName();
+			this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
+			this.placeholder = new Exception(
+					"Serialized representation of " + originalErrorClassName + ": " + getMessage());
+			this.placeholder.setStackTrace(exception.getStackTrace());
+			initCause(this.placeholder);
+			
+			byte[] serialized;
 			try {
-				this.cachedError = new Exception(ExceptionUtils.stringifyException(error));
-				serializedError = InstantiationUtil.serializeObject(this.cachedError);
+				serialized = InstantiationUtil.serializeObject(exception);
 			}
-			catch (Throwable tt) {
-				// seems like we cannot do much to report the actual exception
-				// report a placeholder instead
+			catch (Throwable t) {
+				// could not serialize exception. send the stringified version instead
 				try {
-					this.cachedError = new Exception("Cause is a '" + error.getClass().getName()
-							+ "' (failed to serialize or stringify)");
-					serializedError = InstantiationUtil.serializeObject(this.cachedError);
+					serialized = InstantiationUtil.serializeObject(placeholder);
 				}
-				catch (Throwable ttt) {
-					// this should never happen unless the JVM is fubar.
-					// we just report the state without the error
-					this.cachedError = null;
-					serializedError = null;
+				catch (IOException e) {
+					// this should really never happen, as we only serialize a a standard exception
+					throw new RuntimeException(e.getMessage(), e);
 				}
 			}
+			this.serializedException = serialized;
+		}
+		else {
+			// copy from that serialized throwable
+			SerializedThrowable other = (SerializedThrowable) exception;
+			this.serializedException = other.serializedException;
+			this.originalErrorClassName = other.originalErrorClassName;
+			this.fullStingifiedStackTrace = other.fullStingifiedStackTrace;
+			this.placeholder = other.placeholder;
+			this.cachedException = other.cachedException;
 		}
-		this.serializedError = serializedError;
 	}
 
 	public Throwable deserializeError(ClassLoader userCodeClassloader) {
-		if (this.cachedError == null) {
+		Throwable cached = cachedException == null ? null : cachedException.get();
+		if (cached == null) {
 			try {
-				cachedError = (Throwable) InstantiationUtil.deserializeObject(this.serializedError, userCodeClassloader);
+				cached = (Throwable) InstantiationUtil.deserializeObject(serializedException, userCodeClassloader);
+				cachedException = new WeakReference<Throwable>(cached);
 			}
 			catch (Exception e) {
-				throw new RuntimeException("Error while deserializing the attached exception", e);
+				return placeholder;
 			}
 		}
-		return this.cachedError;
+		return cached;
 	}
+	
+	public String getStrigifiedStackTrace() {
+		return fullStingifiedStackTrace;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Override the behavior of Throwable
+	// ------------------------------------------------------------------------
 
 	@Override
-	public boolean equals(Object obj) {
-		if(obj instanceof SerializedThrowable) {
-			return Arrays.equals(this.serializedError, ((SerializedThrowable)obj).serializedError);
-		}
-		return false;
+	public Throwable getCause() {
+		return placeholder;
 	}
 
 	@Override
+	public void printStackTrace(PrintStream s) {
+		s.print(fullStingifiedStackTrace);
+		s.flush();
+	}
+	
+	@Override
+	public void printStackTrace(PrintWriter s) {
+		s.print(fullStingifiedStackTrace);
+		s.flush();
+	}
+	
+	@Override
 	public String toString() {
-		if(cachedError != null) {
-			return cachedError.getClass().getName() + ": " + cachedError.getMessage();
-		}
-		if(serializedError == null) {
-			return "(null)"; // can not happen as per Ctor check.
-		} else {
-			return "(serialized)";
-		}
+		String message = getLocalizedMessage();
+		return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
+	}
+
+	@Override
+	public StackTraceElement[] getStackTrace() {
+		return placeholder.getStackTrace();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Static utilities
+	// ------------------------------------------------------------------------
+	
 	public static Throwable get(Throwable serThrowable, ClassLoader loader) {
-		if(serThrowable instanceof SerializedThrowable) {
+		if (serThrowable instanceof SerializedThrowable) {
 			return ((SerializedThrowable)serThrowable).deserializeError(loader);
 		} else {
 			return serThrowable;
 		}
 	}
+	
+	private static String getMessageOrError(Throwable error) {
+		try {
+			return error.getMessage();
+		}
+		catch (Throwable t) {
+			return "(failed to get message)";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a67c23a..e9a31fb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -23,7 +23,7 @@ import java.lang.reflect.{InvocationTargetException, Constructor}
 import java.net.InetSocketAddress
 import java.util.{UUID, Collections}
 
-import akka.actor.Status.{Failure, Success}
+import akka.actor.Status.Failure
 import akka.actor._
 
 import grizzled.slf4j.Logger
@@ -326,21 +326,18 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
-          val deserializedError = if(error != null) {
-            error.deserializeError(executionGraph.getUserClassLoader)
-          } else null
           log.info(
             s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
-            deserializedError)
+            error)
 
-          if (newJobStatus.isTerminalState) {
+          if (newJobStatus.isTerminalState()) {
             jobInfo.end = timeStamp
 
             // is the client waiting for the job result?
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
-                  executionGraph.getAccumulatorsSerialized
+                  executionGraph.getAccumulatorsSerialized()
                 } catch {
                   case e: Exception =>
                     log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
@@ -353,24 +350,26 @@ class JobManager(
                 jobInfo.client ! decorateMessage(JobResultSuccess(result))
 
               case JobStatus.CANCELED =>
-                jobInfo.client ! decorateMessage(
-                  Failure(
-                    new JobCancellationException(
-                      jobID,
-                      "Job was cancelled.",
-                      new SerializedThrowable(deserializedError))))
+                // the error may be packed as a serialized throwable
+                val unpackedError = SerializedThrowable.get(
+                  error, executionGraph.getUserClassLoader())
+                
+                jobInfo.client ! decorateMessage(JobResultFailure(
+                  new SerializedThrowable(
+                    new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
 
               case JobStatus.FAILED =>
-                jobInfo.client ! decorateMessage(
-                  Failure(
-                    new JobExecutionException(
-                      jobID,
-                      "Job execution failed.",
-                      new SerializedThrowable(deserializedError))))
+                val unpackedError = SerializedThrowable.get(
+                  error, executionGraph.getUserClassLoader())
+                
+                jobInfo.client ! decorateMessage(JobResultFailure(
+                  new SerializedThrowable(
+                    new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
 
               case x =>
                 val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
-                jobInfo.client ! decorateMessage(Failure(new SerializedThrowable(exception)))
+                jobInfo.client ! decorateMessage(JobResultFailure(
+                  new SerializedThrowable(exception)))
                 throw exception
             }
 
@@ -525,11 +524,11 @@ class JobManager(
    */
   private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
     if (jobGraph == null) {
-      sender ! decorateMessage(
-        Failure(
+      sender() ! decorateMessage(JobResultFailure(
+        new SerializedThrowable(
           new JobSubmissionException(null, "JobGraph must not be null.")
         )
-      )
+      ))
     }
     else {
       val jobId = jobGraph.getJobID
@@ -568,17 +567,17 @@ class JobManager(
             executionContext,
             jobGraph.getJobID,
             jobGraph.getName,
-            jobGraph.getJobConfiguration,
+            jobGraph.getJobConfiguration(),
             timeout,
-            jobGraph.getUserJarBlobKeys,
+            jobGraph.getUserJarBlobKeys(),
             userCodeLoader),
             JobInfo(sender(), System.currentTimeMillis())
           )
         )._1
 
         // configure the execution graph
-        val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
-          jobGraph.getNumberOfExecutionRetries
+        val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) {
+          jobGraph.getNumberOfExecutionRetries()
         } else {
           defaultExecutionRetries
         }
@@ -612,8 +611,9 @@ class JobManager(
             vertex.initializeOnMaster(userCodeLoader)
           }
           catch {
-            case t: Throwable => throw new JobExecutionException(jobId,
-              "Cannot initialize task '" + vertex.getName + "': " + t.getMessage, t)
+            case t: Throwable =>
+              throw new JobExecutionException(jobId,
+                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
           }
         }
 
@@ -643,13 +643,13 @@ class JobManager(
           }
 
           val triggerVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToTrigger.asScala.map(idToVertex).asJava
+            snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
 
           val ackVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToAcknowledge.asScala.map(idToVertex).asJava
+            snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
 
           val confirmVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToConfirm.asScala.map(idToVertex).asJava
+            snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
 
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
@@ -673,7 +673,7 @@ class JobManager(
         }
 
         // done with submitting the job
-        sender ! decorateMessage(Success(jobGraph.getJobID))
+        sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
       }
       catch {
         case t: Throwable =>
@@ -692,7 +692,7 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t)
           }
 
-          sender ! decorateMessage(Failure(rt))
+          sender() ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
           return
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index ce6fdf3..83bafaa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -25,13 +25,16 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
-import org.apache.flink.runtime.util.SerializedThrowable
 
 /**
  * This object contains the execution graph specific messages.
  */
 object ExecutionGraphMessages {
 
+  // --------------------------------------------------------------------------
+  //  Messages
+  // --------------------------------------------------------------------------
+  
   /**
    * Denotes the execution state change of an
    * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]]
@@ -64,6 +67,7 @@ object ExecutionGraphMessages {
       } else {
         ""
       }
+      
       s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
         1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg"
     }
@@ -75,23 +79,29 @@ object ExecutionGraphMessages {
    * @param jobID identifying the corresponding job
    * @param newJobStatus
    * @param timestamp
-   * @param serializedError
+   * @param error
    */
   case class JobStatusChanged(
       jobID: JobID,
       newJobStatus: JobStatus,
       timestamp: Long,
-      serializedError: SerializedThrowable)
+      error: Throwable)
     extends RequiresLeaderSessionID {
+    
     override def toString: String = {
       s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus."
     }
   }
 
+  // --------------------------------------------------------------------------
+  //  Utilities
+  // --------------------------------------------------------------------------
+  
   private val DATE_FORMATTER: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
 
   private def timestampToString(timestamp: Long): String = {
-    DATE_FORMATTER.format(new Date(timestamp))
+    DATE_FORMATTER.synchronized {
+      DATE_FORMATTER.format(new Date(timestamp))
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index e38986b..1c250af 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.util.SerializedThrowable
 
 import scala.collection.JavaConverters._
 
@@ -163,10 +164,23 @@ object JobManagerMessages {
   case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
 
   /**
+   * Denotes a successful job submission.
+   * @param jobId Ths job's ID.
+   */
+  case class JobSubmitSuccess(jobId: JobID)
+  
+  /**
    * Denotes a successful job execution.
+   * @param result The result of the job execution, in serialized form.
    */
   case class JobResultSuccess(result: SerializedJobExecutionResult)
 
+  /**
+   * Denotes an unsuccessful job execution.
+   * @param cause The exception that caused the job to fail, in serialized form.
+   */
+  case class JobResultFailure(cause: SerializedThrowable)
+
 
   sealed trait CancellationResponse{
     def jobID: JobID

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5925c96..c4c35f8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,23 +19,25 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
-import akka.pattern.Patterns.gracefulStop
 
+import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
+
 import com.typesafe.config.Config
+
 import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
-SerializedJobExecutionResult}
+import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.webmonitor.WebMonitor
+
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
@@ -258,12 +260,16 @@ abstract class FlinkMiniCluster(
       jobGraph,
       timeout,
       printUpdates,
-      this.getClass.getClassLoader)
+      this.getClass().getClassLoader())
   }
 
   @throws(classOf[JobExecutionException])
   def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
-    JobClient.submitJobDetached(getJobManagerGateway(), jobGraph, timeout)
+    JobClient.submitJobDetached(
+      getJobManagerGateway(),
+      jobGraph,
+      timeout,
+      getClass().getClassLoader())
     new JobSubmissionResult(jobGraph.getJobID)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 8b01f06..217f46e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorSystem;
-import akka.actor.Status;
 import akka.testkit.JavaTestKit;
+
 import com.typesafe.config.Config;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
@@ -46,9 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -124,7 +128,7 @@ public class JobManagerTest {
 
 				// Submit the job and wait for all vertices to be running
 				jobManagerGateway.tell(new SubmitJob(jobGraph, false), testActorGateway);
-				expectMsgClass(Status.Success.class);
+				expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
 				jobManagerGateway.tell(
 						new WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()),

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 52154e1..1d0b3b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -109,7 +109,8 @@ public class TaskCancelTest {
 
 			// Run test
 			JobClient.submitJobDetached(
-					flink.getJobManagerGateway(), jobGraph, TestingUtils.TESTING_DURATION());
+					flink.getJobManagerGateway(), jobGraph,
+					TestingUtils.TESTING_DURATION(), getClass().getClassLoader());
 
 			// Wait for the job to make some progress and then cancel
 			awaitRunning(

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index f4c7a57..b3f2456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+
 import org.junit.Test;
 
 /**
@@ -88,7 +89,7 @@ public class TaskExecutionStateTest {
 	}
 	
 	@Test
-	public void hanleNonSerializableException() {
+	public void handleNonSerializableException() {
 		try {
 			@SuppressWarnings({"ThrowableInstanceNeverThrown", "serial"})
 			Exception hostile = new Exception() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
new file mode 100644
index 0000000..3dca362
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.CodeSource;
+import java.security.Permissions;
+import java.security.ProtectionDomain;
+import java.security.cert.Certificate;
+
+import static org.junit.Assert.*;
+
+public class SerializedThrowableTest {
+	
+	@Test
+	public void testIdenticalMessageAndStack() {
+		try {
+			IllegalArgumentException original = new IllegalArgumentException("test message");
+			SerializedThrowable serialized = new SerializedThrowable(original);
+			
+			assertEquals(original.getMessage(), serialized.getMessage());
+			assertEquals(original.toString(), serialized.toString());
+			
+			assertEquals(ExceptionUtils.stringifyException(original),
+							ExceptionUtils.stringifyException(serialized));
+			
+			assertArrayEquals(original.getStackTrace(), serialized.getStackTrace());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialization() {
+		try {
+			// We need an exception whose class is not in the core class loader
+			// we solve that by defining an exception class dynamically
+			
+			// an exception class, as bytes 
+			final byte[] classData = {
+					-54, -2, -70, -66, 0, 0, 0, 51, 0, 21, 10, 0, 3, 0, 18, 7, 0, 19, 7, 0, 20, 1,
+					0, 16, 115, 101, 114, 105, 97, 108, 86, 101, 114, 115, 105, 111, 110, 85, 73,
+					68, 1, 0, 1, 74, 1, 0, 13, 67, 111, 110, 115, 116, 97, 110, 116, 86, 97, 108,
+					117, 101, 5, -103, -52, 22, -41, -23, -36, -25, 47, 1, 0, 6, 60, 105, 110, 105,
+					116, 62, 1, 0, 3, 40, 41, 86, 1, 0, 4, 67, 111, 100, 101, 1, 0, 15, 76, 105,
+					110, 101, 78, 117, 109, 98, 101, 114, 84, 97, 98, 108, 101, 1, 0, 18, 76, 111,
+					99, 97, 108, 86, 97, 114, 105, 97, 98, 108, 101, 84, 97, 98, 108, 101, 1, 0, 4,
+					116, 104, 105, 115, 1, 0, 61, 76, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101,
+					47, 102, 108, 105, 110, 107, 47, 114, 117, 110, 116, 105, 109, 101, 47, 117,
+					116, 105, 108, 47, 84, 101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111,
+					110, 70, 111, 114, 83, 101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111,
+					110, 59, 1, 0, 10, 83, 111, 117, 114, 99, 101, 70, 105, 108, 101, 1, 0, 34, 84,
+					101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111, 110, 70, 111, 114, 83,
+					101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111, 110, 46, 106, 97, 118, 97,
+					12, 0, 9, 0, 10, 1, 0, 59, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101, 47,
+					102, 108, 105, 110, 107, 47, 114, 117, 110, 116, 105, 109, 101, 47, 117, 116,
+					105, 108, 47, 84, 101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111, 110, 70,
+					111, 114, 83, 101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111, 110, 1, 0,
+					19, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 69, 120, 99, 101, 112, 116,
+					105, 111, 110, 0, 33, 0, 2, 0, 3, 0, 0, 0, 1, 0, 26, 0, 4, 0, 5, 0, 1, 0, 6, 0,
+					0, 0, 2, 0, 7, 0, 1, 0, 1, 0, 9, 0, 10, 0, 1, 0, 11, 0, 0, 0, 47, 0, 1, 0, 1, 0,
+					0, 0, 5, 42, -73, 0, 1, -79, 0, 0, 0, 2, 0, 12, 0, 0, 0, 6, 0, 1, 0, 0, 0, 21,
+					0, 13, 0, 0, 0, 12, 0, 1, 0, 0, 0, 5, 0, 14, 0, 15, 0, 0, 0, 1, 0, 16, 0, 0, 0,
+					2, 0, 17};
+
+			// dummy class loader that has no access to any classes
+			ClassLoader loader = new URLClassLoader(new URL[0]);
+
+			// define a class into the classloader
+			Class<?> clazz = MemoryUtils.UNSAFE.defineClass(
+					"org.apache.flink.runtime.util.TestExceptionForSerialization",
+					classData, 0, classData.length,
+					loader,
+					new ProtectionDomain(new CodeSource(null, (Certificate[]) null), new Permissions()));
+			
+			// create an instance of the exception (no message, no cause)
+			Exception userException = clazz.asSubclass(Exception.class).newInstance();
+			
+			// check that we cannot simply copy the exception
+			try {
+				byte[] serialized = InstantiationUtil.serializeObject(userException);
+				InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+				fail("should fail with a class not found exception");
+			}
+			catch (ClassNotFoundException e) {
+				// as we want it
+			}
+			
+			// validate that the SerializedThrowable mimics the original exception
+			SerializedThrowable serialized = new SerializedThrowable(userException);
+			assertEquals(userException.getMessage(), serialized.getMessage());
+			assertEquals(userException.toString(), serialized.toString());
+			assertEquals(ExceptionUtils.stringifyException(userException),
+					ExceptionUtils.stringifyException(serialized));
+			assertArrayEquals(userException.getStackTrace(), serialized.getStackTrace());
+
+			// copy the serialized throwable and make sure everything still works
+			SerializedThrowable copy = CommonTestUtils.createCopySerializable(serialized);
+			assertEquals(userException.getMessage(), copy.getMessage());
+			assertEquals(userException.toString(), copy.toString());
+			assertEquals(ExceptionUtils.stringifyException(userException),
+					ExceptionUtils.stringifyException(copy));
+			assertArrayEquals(userException.getStackTrace(), copy.getStackTrace());
+			
+			// deserialize the proper exception
+			Throwable deserialized = copy.deserializeError(loader); 
+			assertEquals(clazz, deserialized.getClass());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	} 
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 718496c..94ead78 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -24,7 +24,7 @@ import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -68,12 +68,12 @@ class CoLocationConstraintITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val gateway = cluster.getJobManagerGateway
+      val gateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           gateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3238dd5..ddca3e2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -66,10 +66,10 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Test Job", vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
-        val response = (jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration)).mapTo[Int]
+        val response = jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration).mapTo[Int]
 
         val availableSlots = Await.result(response, duration)
 
@@ -77,21 +77,16 @@ class JobManagerITCase(_system: ActorSystem)
 
         within(2 second) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-
-          val success = expectMsgType[Success]
-
-          jobGraph.getJobID should equal(success.status)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
         }
 
         within(2 second) {
-          val response = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(response.cause, this.getClass.getClassLoader)
+          val response = expectMsgType[JobResultFailure]
+          val exception = response.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
-              val cause = e.getCause.asInstanceOf[SerializedThrowable].deserializeError(
-                this.getClass.getClassLoader)
-              new NoResourceAvailableException(1,1,0) should equal(cause)
+              new NoResourceAvailableException(1,1,0) should equal(e.getCause())
             case e => fail(s"Received wrong exception of type $e.")
           }
         }
@@ -113,10 +108,10 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Test Job", vertex)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
-        val response = (jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration)).mapTo[Int]
+        val response = jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration).mapTo[Int]
 
         val availableSlots = Await.result(response, duration)
 
@@ -125,9 +120,9 @@ class JobManagerITCase(_system: ActorSystem)
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+          
           val result = expectMsgType[JobResultSuccess]
-
           result.result.getJobId() should equal(jobGraph.getJobID)
         }
 
@@ -149,13 +144,13 @@ class JobManagerITCase(_system: ActorSystem)
       jobGraph.setAllowQueuedScheduling(true)
 
       val cluster = TestingUtils.startTestingCluster(10)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -184,13 +179,13 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -219,13 +214,13 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }
@@ -256,15 +251,15 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
 
           exception match {
             case e: JobExecutionException =>
@@ -301,12 +296,12 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }
@@ -345,13 +340,13 @@ class JobManagerITCase(_system: ActorSystem)
       jobGraph.setScheduleMode(ScheduleMode.ALL)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
 
@@ -379,7 +374,7 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
@@ -389,10 +384,10 @@ class JobManagerITCase(_system: ActorSystem)
 
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -427,7 +422,7 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
@@ -437,10 +432,10 @@ class JobManagerITCase(_system: ActorSystem)
 
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -472,15 +467,15 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -512,7 +507,7 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
@@ -520,10 +515,10 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(num_tasks)
 
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -560,7 +555,7 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
@@ -568,10 +563,10 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(num_tasks)
 
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -603,13 +598,13 @@ class JobManagerITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, sink)
 
       val cluster = TestingUtils.startTestingCluster(2*num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try{
         within(TestingUtils.TESTING_DURATION){
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
         }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 0c0e28a..d454e69 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.Status.Success
 import akka.actor.{PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{ JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
@@ -79,13 +78,13 @@ class RecoveryITCase(_system: ActorSystem)
       jobGraph.setNumberOfExecutionRetries(1)
 
       val cluster = startTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s")
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION){
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -122,13 +121,13 @@ class RecoveryITCase(_system: ActorSystem)
       jobGraph.setNumberOfExecutionRetries(1)
 
       val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION){
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -166,13 +165,13 @@ class RecoveryITCase(_system: ActorSystem)
 
       val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
 
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION){
           jmGateway.tell(SubmitJob(jobGraph, false), self)
 
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID), self)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 406b054..23e8ac1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -24,7 +24,7 @@ import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -66,12 +66,12 @@ class SlotSharingITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
 
         }
@@ -110,12 +110,12 @@ class SlotSharingITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
 
       try {
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba53558/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 7017c33..ae51da2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.Status.{Failure, Success}
 import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.apache.flink.runtime.util.SerializedThrowable
@@ -70,13 +69,13 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
       val taskManagers = cluster.getTaskManagers
 
       try{
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
 
@@ -85,8 +84,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
           //kill task manager
           taskManagers(0) ! PoisonPill
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
@@ -119,13 +118,13 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
-      val jmGateway = cluster.getJobManagerGateway
+      val jmGateway = cluster.getJobManagerGateway()
       val taskManagers = cluster.getTaskManagers
 
       try{
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
-          expectMsg(Success(jobGraph.getJobID))
+          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
           expectMsg(AllVerticesRunning(jobID))
@@ -133,8 +132,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
           //kill task manager
           taskManagers(0) ! Kill
 
-          val failure = expectMsgType[Failure]
-          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          val failure = expectMsgType[JobResultFailure]
+          val exception = failure.cause.deserializeError(getClass.getClassLoader())
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)


[3/4] flink git commit: [FLINK-2543] [core] Fix user object deserialization for file-based state handles.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index d108798..1952760 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobMan
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.util.SerializedThrowable
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
 
 import org.junit.runner.RunWith
@@ -126,8 +127,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
           }
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -169,8 +170,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
           taskManagers(0) ! Kill
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -208,8 +209,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
           tm ! PoisonPill
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 


[4/4] flink git commit: [FLINK-2543] [core] Fix user object deserialization for file-based state handles.

Posted by se...@apache.org.
[FLINK-2543] [core] Fix user object deserialization for file-based state handles.

Send exceptions from JM --> JC in serialized form.
Exceptions send from the JobManager to the JobClient were relying on
Akka's JavaSerialization, which does not have access to the user code classloader.

This closes #1048


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf8c8e54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf8c8e54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf8c8e54

Branch: refs/heads/master
Commit: bf8c8e54094151348caedd3120931516f76c3cf3
Parents: 554b77b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Aug 18 18:15:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 30 18:21:39 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |   4 +-
 .../org/apache/flink/client/program/Client.java |  12 +-
 .../apache/flink/util/InstantiationUtil.java    |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  27 ++--
 .../runtime/executiongraph/ExecutionGraph.java  |   7 +-
 .../jobgraph/tasks/OperatorStateCarrier.java    |   2 +-
 .../runtime/state/ByteStreamStateHandle.java    |   6 +-
 .../flink/runtime/state/LocalStateHandle.java   |   3 +-
 .../runtime/state/PartitionedStateHandle.java   |   5 +-
 .../apache/flink/runtime/state/StateHandle.java |   8 +-
 .../runtime/taskmanager/TaskExecutionState.java |  74 ++-------
 .../flink/runtime/util/SerializedThrowable.java | 115 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  33 ++--
 .../messages/ExecutionGraphMessages.scala       |   5 +-
 .../runtime/messages/JobClientMessages.scala    |   1 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |  11 +-
 .../PartialConsumePipelinedResultTest.java      |   2 +-
 .../messages/CheckpointMessagesTest.java        |   2 +-
 .../state/ByteStreamStateHandleTest.java        |   7 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  34 +++--
 .../TaskManagerFailsWithSlotSharingITCase.scala |   9 +-
 .../flink/tachyon/FileStateHandleTest.java      |   2 +-
 .../operators/AbstractUdfStreamOperator.java    |   5 +-
 .../streaming/api/state/EagerStateStore.java    |   4 +-
 .../api/state/OperatorStateHandle.java          |   4 +-
 .../api/state/PartitionedStateStore.java        |   2 +-
 .../state/PartitionedStreamOperatorState.java   |   4 +-
 .../api/state/StreamOperatorState.java          |   4 +-
 .../streaming/api/state/WrapperStateHandle.java |   3 +-
 .../streaming/runtime/tasks/StreamTask.java     |  13 +-
 .../streaming/api/state/StateHandleTest.java    |  13 +-
 .../api/state/StatefulOperatorTest.java         |   5 +-
 .../streaming/util/TestStreamEnvironment.java   |   6 +-
 .../flink/test/util/RecordAPITestBase.java      |   4 +-
 .../apache/flink/test/util/TestEnvironment.java |   5 +-
 flink-tests/pom.xml                             |  19 +++
 ...-state-checkpointed-classloader-assembly.xml |  39 +++++
 .../test/classloading/ClassLoaderITCase.java    |  34 ++++-
 .../jar/CheckpointedStreamingProgram.java       | 150 +++++++++++++++++++
 .../JobSubmissionFailsITCase.java               |  19 +--
 .../taskmanager/TaskManagerFailsITCase.scala    |  13 +-
 41 files changed, 519 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 07a3a8e..b288996 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
@@ -176,8 +175,7 @@ public class LocalExecutor extends PlanExecutor {
 				JobGraph jobGraph = jgg.compileJobGraph(op);
 				
 				boolean sysoutPrint = isPrintingStatusDuringExecution();
-				SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph,sysoutPrint);
-				return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+				return flink.submitJobAndWait(jobGraph, sysoutPrint);
 			}
 			finally {
 				if (shutDownAtEnd) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 06156fa..2f5d888 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -53,7 +53,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -425,15 +424,8 @@ public class Client {
 
 		try{
 			if (wait) {
-				SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem, 
-						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution);
-				try {
-					return result.toJobExecutionResult(this.userCodeClassLoader);
-				}
-				catch (Exception e) {
-					throw new ProgramInvocationException(
-							"Failed to deserialize the accumulator result after the job execution", e);
-				}
+				return JobClient.submitJobAndWait(actorSystem,
+						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, this.userCodeClassLoader);
 			}
 			else {
 				JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 9955422..2a158e7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -46,7 +46,7 @@ public class InstantiationUtil {
 	 * user-code ClassLoader.
 	 *
 	 */
-	private static class ClassLoaderObjectInputStream extends ObjectInputStream {
+	public static class ClassLoaderObjectInputStream extends ObjectInputStream {
 		private ClassLoader classLoader;
 
 		private static final HashMap<String, Class<?>> primitiveClasses

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index aeefa61..44d2c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -23,10 +23,10 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
+import org.apache.flink.runtime.util.SerializedThrowable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,8 +65,7 @@ public class JobClient {
 	public static ActorSystem startJobClientActorSystem(Configuration config)
 			throws IOException {
 		LOG.info("Starting JobClient actor system");
-		Option<Tuple2<String, Object>> remoting =
-				new Some<Tuple2<String, Object>>(new Tuple2<String, Object>("", 0));
+		Option<Tuple2<String, Object>> remoting = new Some<>(new Tuple2<String, Object>("", 0));
 
 		// start a remote actor system to listen on an arbitrary port
 		ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
@@ -123,12 +123,13 @@ public class JobClient {
 	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
 	 *                                                               execution fails.
 	 */
-	public static SerializedJobExecutionResult submitJobAndWait(
+	public static JobExecutionResult submitJobAndWait(
 			ActorSystem actorSystem,
 			ActorGateway jobManagerGateway,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
-			boolean sysoutLogUpdates) throws JobExecutionException {
+			boolean sysoutLogUpdates,
+			ClassLoader userCodeClassloader) throws JobExecutionException {
 
 		Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
 		Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
@@ -160,26 +161,30 @@ public class JobClient {
 
 				SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
 				if (result != null) {
-					return result;
+					return result.toJobExecutionResult(userCodeClassloader);
 				} else {
 					throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
 				}
-			} else if (answer instanceof Status.Failure) {
-				throw ((Status.Failure) answer).cause();
 			} else {
 				throw new Exception("Unknown answer after submitting the job: " + answer);
 			}
 		}
 		catch (JobExecutionException e) {
-			throw e;
+			if(e.getCause() instanceof SerializedThrowable) {
+				SerializedThrowable serializedThrowable = (SerializedThrowable)e.getCause();
+				Throwable deserialized = serializedThrowable.deserializeError(userCodeClassloader);
+				throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed " + deserialized.getMessage(), deserialized);
+			} else {
+				throw e;
+			}
 		}
 		catch (TimeoutException e) {
 			throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " +
 					"Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
 		}
-		catch (Throwable t) {
+		catch (Throwable throwable) {
 			throw new JobExecutionException(jobGraph.getJobID(),
-					"Communication with JobManager failed: " + t.getMessage(), t);
+					"Communication with JobManager failed: " + throwable.getMessage(), throwable);
 		}
 		finally {
 			// failsafe shutdown of the client actor

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 169971d..9648a8f 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -1028,8 +1029,12 @@ public class ExecutionGraph implements Serializable {
 	
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if (jobStatusListenerActors.size() > 0) {
+			SerializedThrowable serializedThrowable = null;
+			if(error != null) {
+				serializedThrowable = new SerializedThrowable(error);
+			}
 			ExecutionGraphMessages.JobStatusChanged message =
-					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), error);
+					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), serializedThrowable);
 
 			for (ActorGateway listener: jobStatusListenerActors) {
 				listener.tell(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index fb5e63f..5045ca4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -33,6 +33,6 @@ public interface OperatorStateCarrier<T extends StateHandle<?>> {
 	 * 
 	 * @param stateHandle The handle to the state.
 	 */
-	public void setInitialState(T stateHandle) throws Exception;
+	void setInitialState(T stateHandle) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index bf2dca8..7ecfe62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.InstantiationUtil;
+
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -56,9 +58,9 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 	protected abstract InputStream getInputStream() throws Exception;
 
 	@Override
-	public Serializable getState() throws Exception {
+	public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
 		if (!stateFetched()) {
-			ObjectInputStream stream = new ObjectInputStream(getInputStream());
+			ObjectInputStream stream = new InstantiationUtil.ClassLoaderObjectInputStream(getInputStream(), userCodeClassLoader);
 			try {
 				state = (Serializable) stream.readObject();
 			} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index 5ba372d..1b524d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -34,7 +34,8 @@ public class LocalStateHandle<T extends Serializable> implements StateHandle<T>
 	}
 
 	@Override
-	public T getState() {
+	public T getState(ClassLoader userCodeClassLoader) {
+		// The object has been deserialized correctly before
 		return state;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
index b6981c3..9ec748b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
@@ -24,7 +24,8 @@ import java.util.Map;
 /**
  * Wrapper for storing the handles for each state in a partitioned form. It can
  * be used to repartition the state before re-injecting to the tasks.
- * 
+ *
+ * TODO: This class needs testing!
  */
 public class PartitionedStateHandle implements
 		StateHandle<Map<Serializable, StateHandle<Serializable>>> {
@@ -38,7 +39,7 @@ public class PartitionedStateHandle implements
 	}
 
 	@Override
-	public Map<Serializable, StateHandle<Serializable>> getState() throws Exception {
+	public Map<Serializable, StateHandle<Serializable>> getState(ClassLoader userCodeClassLoader) throws Exception {
 		return handles;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index c1342b8..53d5765 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -28,12 +28,14 @@ import java.io.Serializable;
 public interface StateHandle<T> extends Serializable {
 
 	/**
-	 * This retrieves and return the state represented by the handle. 
-	 * 
+	 * This retrieves and return the state represented by the handle.
+	 *
+	 * @param userCodeClassLoader Class loader for deserializing user code specific classes
+	 *
 	 * @return The state represented by the handle.
 	 * @throws java.lang.Exception Thrown, if the state cannot be fetched.
 	 */
-	T getState() throws Exception;
+	T getState(ClassLoader userCodeClassLoader) throws Exception;
 	
 	/**
 	 * Discards the state referred to by this handle, to free up resources in

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 0637017..07b7ee8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import java.util.Arrays;
-
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * This class represents an update about a task's execution state.
@@ -47,11 +44,7 @@ public class TaskExecutionState implements java.io.Serializable {
 
 	private final ExecutionState executionState;
 
-	private final byte[] serializedError;
-
-	// The exception must not be (de)serialized with the class, as its
-	// class may not be part of the system class loader.
-	private transient Throwable cachedError;
+	private final SerializedThrowable throwable;
 
 	/** Serialized flink and user-defined accumulators */
 	private final AccumulatorSnapshot accumulators;
@@ -104,49 +97,19 @@ public class TaskExecutionState implements java.io.Serializable {
 			ExecutionState executionState, Throwable error,
 			AccumulatorSnapshot accumulators) {
 
-
-			if (jobID == null || executionId == null || executionState == null) {
+		if (jobID == null || executionId == null || executionState == null) {
 			throw new NullPointerException();
 		}
 
 		this.jobID = jobID;
 		this.executionId = executionId;
 		this.executionState = executionState;
-		this.cachedError = error;
-		this.accumulators = accumulators;
-
-		if (error != null) {
-			byte[] serializedError;
-			try {
-				serializedError = InstantiationUtil.serializeObject(error);
-			}
-			catch (Throwable t) {
-				// could not serialize exception. send the stringified version instead
-				try {
-					this.cachedError = new Exception(ExceptionUtils.stringifyException(error));
-					serializedError = InstantiationUtil.serializeObject(this.cachedError);
-				}
-				catch (Throwable tt) {
-					// seems like we cannot do much to report the actual exception
-					// report a placeholder instead
-					try {
-						this.cachedError = new Exception("Cause is a '" + error.getClass().getName()
-								+ "' (failed to serialize or stringify)");
-						serializedError = InstantiationUtil.serializeObject(this.cachedError);
-					}
-					catch (Throwable ttt) {
-						// this should never happen unless the JVM is fubar.
-						// we just report the state without the error
-						this.cachedError = null;
-						serializedError = null;
-					}
-				}
-			}
-			this.serializedError = serializedError;
-		}
-		else {
-			this.serializedError = null;
+		if(error != null) {
+			this.throwable = new SerializedThrowable(error);
+		} else {
+			this.throwable = null;
 		}
+		this.accumulators = accumulators;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -160,19 +123,11 @@ public class TaskExecutionState implements java.io.Serializable {
 	 *                            job this update refers to.
 	 */
 	public Throwable getError(ClassLoader usercodeClassloader) {
-		if (this.serializedError == null) {
+		if (this.throwable == null) {
 			return null;
+		} else {
+			return throwable.deserializeError(usercodeClassloader);
 		}
-
-		if (this.cachedError == null) {
-			try {
-				cachedError = (Throwable) InstantiationUtil.deserializeObject(this.serializedError, usercodeClassloader);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Error while deserializing the attached exception", e);
-			}
-		}
-		return this.cachedError;
 	}
 
 	/**
@@ -218,8 +173,8 @@ public class TaskExecutionState implements java.io.Serializable {
 			return other.jobID.equals(this.jobID) &&
 					other.executionId.equals(this.executionId) &&
 					other.executionState == this.executionState &&
-					(other.serializedError == null ? this.serializedError == null :
-						(this.serializedError != null && Arrays.equals(this.serializedError, other.serializedError)));
+					(other.throwable == null ? this.throwable == null :
+						(this.throwable != null && throwable.equals(other.throwable) ));
 		}
 		else {
 			return false;
@@ -235,7 +190,6 @@ public class TaskExecutionState implements java.io.Serializable {
 	public String toString() {
 		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
 				jobID, executionId, executionState,
-				cachedError == null ? (serializedError == null ? "(null)" : "(serialized)")
-									: (cachedError.getClass().getName() + ": " + cachedError.getMessage()));
+				throwable == null ? "(null)" : throwable.toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
new file mode 100644
index 0000000..6e5a558
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Utility class for dealing with serialized Throwables.
+ * Needed to send around user-specific exception classes with Akka.
+ */
+public class SerializedThrowable extends Exception implements Serializable {
+	private static final long serialVersionUID = 7284183123441947635L;
+	private final byte[] serializedError;
+
+	// The exception must not be (de)serialized with the class, as its
+	// class may not be part of the system class loader.
+	private transient Throwable cachedError;
+
+
+	/**
+	 * Create a new SerializedThrowable.
+	 * @param error The exception to serialize.
+	 */
+	public SerializedThrowable(Throwable error) {
+		Preconditions.checkNotNull(error, "The exception to serialize has to be set");
+		this.cachedError = error;
+		byte[] serializedError;
+		try {
+			serializedError = InstantiationUtil.serializeObject(error);
+		}
+		catch (Throwable t) {
+			// could not serialize exception. send the stringified version instead
+			try {
+				this.cachedError = new Exception(ExceptionUtils.stringifyException(error));
+				serializedError = InstantiationUtil.serializeObject(this.cachedError);
+			}
+			catch (Throwable tt) {
+				// seems like we cannot do much to report the actual exception
+				// report a placeholder instead
+				try {
+					this.cachedError = new Exception("Cause is a '" + error.getClass().getName()
+							+ "' (failed to serialize or stringify)");
+					serializedError = InstantiationUtil.serializeObject(this.cachedError);
+				}
+				catch (Throwable ttt) {
+					// this should never happen unless the JVM is fubar.
+					// we just report the state without the error
+					this.cachedError = null;
+					serializedError = null;
+				}
+			}
+		}
+		this.serializedError = serializedError;
+	}
+
+	public Throwable deserializeError(ClassLoader userCodeClassloader) {
+		if (this.cachedError == null) {
+			try {
+				cachedError = (Throwable) InstantiationUtil.deserializeObject(this.serializedError, userCodeClassloader);
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Error while deserializing the attached exception", e);
+			}
+		}
+		return this.cachedError;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if(obj instanceof SerializedThrowable) {
+			return Arrays.equals(this.serializedError, ((SerializedThrowable)obj).serializedError);
+		}
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		if(cachedError != null) {
+			return cachedError.getClass().getName() + ": " + cachedError.getMessage();
+		}
+		if(serializedError == null) {
+			return "(null)"; // can not happen as per Ctor check.
+		} else {
+			return "(serialized)";
+		}
+	}
+
+	public static Throwable get(Throwable serThrowable, ClassLoader loader) {
+		if(serThrowable instanceof SerializedThrowable) {
+			return ((SerializedThrowable)serThrowable).deserializeError(loader);
+		} else {
+			return serThrowable;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 839fdb4..a67c23a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -47,11 +47,10 @@ import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.ZooKeeperUtil
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.util.{SerializedThrowable, ZooKeeperUtil, EnvironmentInformation}
 import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages}
-import org.apache.flink.runtime.{LogMessages}
+import org.apache.flink.runtime.LogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, InstanceManager}
@@ -327,8 +326,12 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
-          log.info(s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
-            error)
+          val deserializedError = if(error != null) {
+            error.deserializeError(executionGraph.getUserClassLoader)
+          } else null
+          log.info(
+            s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
+            deserializedError)
 
           if (newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
@@ -343,8 +346,10 @@ class JobManager(
                     log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
                     Collections.emptyMap()
                 }
-                val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
-                                                              accumulatorResults)
+                val result = new SerializedJobExecutionResult(
+                  jobID,
+                  jobInfo.duration,
+                  accumulatorResults)
                 jobInfo.client ! decorateMessage(JobResultSuccess(result))
 
               case JobStatus.CANCELED =>
@@ -352,9 +357,8 @@ class JobManager(
                   Failure(
                     new JobCancellationException(
                       jobID,
-                    "Job was cancelled.", error)
-                  )
-                )
+                      "Job was cancelled.",
+                      new SerializedThrowable(deserializedError))))
 
               case JobStatus.FAILED =>
                 jobInfo.client ! decorateMessage(
@@ -362,14 +366,11 @@ class JobManager(
                     new JobExecutionException(
                       jobID,
                       "Job execution failed.",
-                      error)
-                  )
-                )
+                      new SerializedThrowable(deserializedError))))
 
               case x =>
-                val exception = new JobExecutionException(jobID, s"$x is not a " +
-                  "terminal state.")
-                jobInfo.client ! decorateMessage(Failure(exception))
+                val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
+                jobInfo.client ! decorateMessage(Failure(new SerializedThrowable(exception)))
                 throw exception
             }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 0cb3b0d..ce6fdf3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
+import org.apache.flink.runtime.util.SerializedThrowable
 
 /**
  * This object contains the execution graph specific messages.
@@ -74,13 +75,13 @@ object ExecutionGraphMessages {
    * @param jobID identifying the corresponding job
    * @param newJobStatus
    * @param timestamp
-   * @param error
+   * @param serializedError
    */
   case class JobStatusChanged(
       jobID: JobID,
       newJobStatus: JobStatus,
       timestamp: Long,
-      error: Throwable)
+      serializedError: SerializedThrowable)
     extends RequiresLeaderSessionID {
     override def toString: String = {
       s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus."

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index e0dce35..ac37493 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.runtime.jobgraph.JobGraph
+import org.apache.flink.runtime.util.SerializedThrowable
 
 /**
  * This object contains the [[org.apache.flink.runtime.client.JobClient]] specific messages

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 7c57233..5925c96 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
-import org.apache.flink.api.common.JobSubmissionResult
+import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -238,9 +238,7 @@ abstract class FlinkMiniCluster(
   }
 
   @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean)
-                                                                : SerializedJobExecutionResult = {
-
+  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean): JobExecutionResult = {
     submitJobAndWait(jobGraph, printUpdates, timeout)
   }
   
@@ -249,7 +247,7 @@ abstract class FlinkMiniCluster(
       jobGraph: JobGraph,
       printUpdates: Boolean,
       timeout: FiniteDuration)
-    : SerializedJobExecutionResult = {
+    : JobExecutionResult = {
 
     val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
     else JobClient.startJobClientActorSystem(configuration)
@@ -259,7 +257,8 @@ abstract class FlinkMiniCluster(
       getJobManagerGateway(),
       jobGraph,
       timeout,
-      printUpdates)
+      printUpdates,
+      this.getClass.getClassLoader)
   }
 
   @throws(classOf[JobExecutionException])

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index cf81e3e..e9f3a62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -105,7 +105,7 @@ public class PartialConsumePipelinedResultTest {
 				flink.getJobManagerGateway(),
 				jobGraph,
 				TestingUtils.TESTING_DURATION(),
-				false);
+				false, this.getClass().getClassLoader());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 597249a..1e5b12a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -83,7 +83,7 @@ public class CheckpointMessagesTest {
 		private static final long serialVersionUID = 8128146204128728332L;
 
 		@Override
-		public Serializable getState() {
+		public Serializable getState(ClassLoader userCodeClassLoader) {
 			return null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
index a7378b9..c667139 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
@@ -36,6 +36,7 @@ public class ByteStreamStateHandleTest {
 
 	@Test
 	public void testHandle() throws Exception {
+		final ClassLoader cl = this.getClass().getClassLoader();
 		MockHandle handle;
 
 		try {
@@ -47,14 +48,14 @@ public class ByteStreamStateHandleTest {
 
 		handle = new MockHandle(1);
 
-		assertEquals(1, handle.getState());
+		assertEquals(1, handle.getState(cl));
 		assertTrue(handle.stateFetched());
 		assertFalse(handle.isWritten());
 		assertFalse(handle.discarded);
 
 		MockHandle handleDs = serializeDeserialize(handle);
 
-		assertEquals(1, handle.getState());
+		assertEquals(1, handle.getState(cl));
 		assertTrue(handle.stateFetched());
 		assertTrue(handle.isWritten());
 		assertTrue(handle.generatedOutput);
@@ -66,7 +67,7 @@ public class ByteStreamStateHandleTest {
 		assertFalse(handle.discarded);
 
 		try {
-			handleDs.getState();
+			handleDs.getState(cl);
 			fail();
 		} catch (UnsupportedOperationException e) {
 			// good

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0c9f4a8..3238dd5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGra
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.util.SerializedThrowable
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -84,11 +85,13 @@ class JobManagerITCase(_system: ActorSystem)
 
         within(2 second) {
           val response = expectMsgType[Failure]
-          val exception = response.cause
+          val exception = SerializedThrowable.get(response.cause, this.getClass.getClassLoader)
           exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
-              new NoResourceAvailableException(1,1,0) should equal(e.getCause)
+              val cause = e.getCause.asInstanceOf[SerializedThrowable].deserializeError(
+                this.getClass.getClassLoader)
+              new NoResourceAvailableException(1,1,0) should equal(cause)
             case e => fail(s"Received wrong exception of type $e.")
           }
         }
@@ -261,8 +264,9 @@ class JobManagerITCase(_system: ActorSystem)
 
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
 
-          failure.cause match {
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -388,8 +392,8 @@ class JobManagerITCase(_system: ActorSystem)
           expectMsg(Success(jobGraph.getJobID))
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -434,9 +438,10 @@ class JobManagerITCase(_system: ActorSystem)
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
           expectMsg(Success(jobGraph.getJobID))
-          val failure = expectMsgType[Failure]
 
-          failure.cause match {
+          val failure = expectMsgType[Failure]
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -473,9 +478,10 @@ class JobManagerITCase(_system: ActorSystem)
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, false), self)
           expectMsg(Success(jobGraph.getJobID))
-          val failure = expectMsgType[Failure]
 
-          failure.cause match {
+          val failure = expectMsgType[Failure]
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -515,9 +521,10 @@ class JobManagerITCase(_system: ActorSystem)
 
           jmGateway.tell(SubmitJob(jobGraph, false), self)
           expectMsg(Success(jobGraph.getJobID))
-          val failure = expectMsgType[Failure]
 
-          failure.cause match {
+          val failure = expectMsgType[Failure]
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 
@@ -562,9 +569,10 @@ class JobManagerITCase(_system: ActorSystem)
 
           jmGateway.tell(SubmitJob(jobGraph, false), self)
           expectMsg(Success(jobGraph.getJobID))
-          val failure = expectMsgType[Failure]
 
-          failure.cause match {
+          val failure = expectMsgType[Failure]
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 7e4bf03..7017c33 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.util.SerializedThrowable
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -85,8 +86,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
           taskManagers(0) ! PoisonPill
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
             case e => fail(s"Received wrong exception $e.")
@@ -133,8 +134,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
           taskManagers(0) ! Kill
 
           val failure = expectMsgType[Failure]
-
-          failure.cause match {
+          val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
index ec414c0..a8734e6 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -114,7 +114,7 @@ public class FileStateHandleTest {
 		assertFalse(deserializedHandle.stateFetched());
 
 		// Fetch the and compare with original
-		assertEquals(state, deserializedHandle.getState());
+		assertEquals(state, deserializedHandle.getState(this.getClass().getClassLoader()));
 
 		// Test whether discard removes the checkpoint file properly
 		assertTrue(hdfs.listFiles(hdPath, true).hasNext());

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 438d529..dae1bf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -78,9 +78,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	@Override
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
+
 		// Restore state using the Checkpointed interface
 		if (userFunction instanceof Checkpointed && snapshots.f0 != null) {
-			((Checkpointed) userFunction).restoreState(snapshots.f0.getState());
+			((Checkpointed) userFunction).restoreState(snapshots.f0.getState(runtimeContext.getUserCodeClassLoader()));
 		}
 		
 		if (snapshots.f1 != null) {
@@ -88,7 +89,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 			for (Entry<String, OperatorStateHandle> snapshot : snapshots.f1.entrySet()) {
 				StreamOperatorState restoredOpState = runtimeContext.getState(snapshot.getKey(), snapshot.getValue().isPartitioned());
 				StateHandle<Serializable> checkpointHandle = snapshot.getValue();
-				restoredOpState.restoreState(checkpointHandle);
+				restoredOpState.restoreState(checkpointHandle, runtimeContext.getUserCodeClassLoader());
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
index 213303a..3277b3f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
@@ -69,10 +69,10 @@ public class EagerStateStore<S, C extends Serializable> implements PartitionedSt
 	}
 
 	@Override
-	public void restoreStates(StateHandle<Serializable> snapshot) throws Exception {
+	public void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
 		
 		@SuppressWarnings("unchecked")
-		Map<Serializable, C> checkpoints = (Map<Serializable, C>) snapshot.getState();
+		Map<Serializable, C> checkpoints = (Map<Serializable, C>) snapshot.getState(userCodeClassLoader);
 		
 		// we map the values back to the state from the checkpoints
 		for (Entry<Serializable, C> snapshotEntry : checkpoints.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
index f308ba8..0c0b2c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
@@ -38,8 +38,8 @@ public class OperatorStateHandle implements StateHandle<Serializable> {
 	}
 
 	@Override
-	public Serializable getState() throws Exception {
-		return handle.getState();
+	public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
+		return handle.getState(userCodeClassLoader);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
index 5201058..e9a02c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
@@ -43,7 +43,7 @@ public interface PartitionedStateStore<S, C extends Serializable> {
 
 	StateHandle<Serializable> snapshotStates(long checkpointId, long checkpointTimestamp) throws Exception;
 
-	void restoreStates(StateHandle<Serializable> snapshot) throws Exception;
+	void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception;
 
 	boolean containsKey(Serializable key);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index b165a94..e9ebfb6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -128,8 +128,8 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	}
 
 	@Override
-	public void restoreState(StateHandle<Serializable> snapshots) throws Exception {
-		stateStore.restoreStates(snapshots);
+	public void restoreState(StateHandle<Serializable> snapshots, ClassLoader userCodeClassLoader) throws Exception {
+		stateStore.restoreStates(snapshots, userCodeClassLoader);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 6e0a3ea..29a19b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -96,8 +96,8 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	}
 
 	@SuppressWarnings("unchecked")
-	public void restoreState(StateHandle<Serializable> snapshot) throws Exception {
-		update((S) checkpointer.restoreState((C) snapshot.getState()));
+	public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
+		update(checkpointer.restoreState((C) snapshot.getState(userCodeClassLoader)));
 	}
 
 	public Map<Serializable, S> getPartitionedState() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
index 27c697a..9105fd2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
@@ -42,7 +42,8 @@ public class WrapperStateHandle extends LocalStateHandle<Serializable> {
 	@Override
 	public void discardState() throws Exception {
 		@SuppressWarnings("unchecked")
-		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) getState();
+		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates =
+				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) getState(null); // we can pass "null" here because the LocalStateHandle is not using the ClassLoader anyways
 		for (Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state : chainedStates) {
 			if (state != null) {
 				if (state.f0 != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ffd04e6..a70fb31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -106,7 +106,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	
 	public StreamTask() {
 		checkpointBarrierListener = new CheckpointBarrierListener();
-		contexts = new ArrayList<StreamingRuntimeContext>();
+		contexts = new ArrayList<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -271,7 +271,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 		// We retrieve end restore the states for the chained operators.
 		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = 
-				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState();
+				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
 
 		// We restore all stateful operators
 		for (int i = 0; i < chainedStates.size(); i++) {
@@ -358,7 +358,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 		// If the user did not specify a provider in the program we try to get it from the config
 		if (provider == null) {
-			String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
+			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
+			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND,
 					ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
 
 			StateBackend backend;
@@ -372,9 +373,9 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			switch (backend) {
 				case JOBMANAGER:
 					LOG.info("State backend for state checkpoints is set to jobmanager.");
-					return new LocalStateHandle.LocalStateHandleProvider<Serializable>();
+					return new LocalStateHandle.LocalStateHandleProvider<>();
 				case FILESYSTEM:
-					String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
+					String checkpointDir = flinkConfig.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
 					if (checkpointDir != null) {
 						LOG.info("State backend for state checkpoints is set to filesystem with directory: "
 								+ checkpointDir);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
index 38117e8..d6a8a54 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
@@ -45,7 +45,7 @@ public class StateHandleTest {
 		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
 
 		OperatorStateHandle opHandle = new OperatorStateHandle(h1, true);
-		assertEquals(1, opHandle.getState());
+		assertEquals(1, opHandle.getState(this.getClass().getClassLoader()));
 
 		OperatorStateHandle dsHandle = serializeDeserialize(opHandle);
 		MockHandle<Serializable> h2 = (MockHandle<Serializable>) dsHandle.getHandle();
@@ -60,6 +60,7 @@ public class StateHandleTest {
 
 	@Test
 	public void wrapperStateHandleTest() throws Exception {
+		final ClassLoader cl = this.getClass().getClassLoader();
 
 		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
 		MockHandle<Serializable> h2 = new MockHandle<Serializable>(2);
@@ -82,16 +83,16 @@ public class StateHandleTest {
 
 		@SuppressWarnings("unchecked")
 		Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> dsFullState = ((List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) dsWrapper
-				.getState()).get(0);
+				.getState(cl)).get(0);
 
 		Map<String, OperatorStateHandle> dsOpHandles = dsFullState.f1;
 
-		assertNull(dsFullState.f0.getState());
+		assertNull(dsFullState.f0.getState(cl));
 		assertFalse(((MockHandle<?>) dsFullState.f0).discarded);
 		assertFalse(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
-		assertNull(dsOpHandles.get("h1").getState());
+		assertNull(dsOpHandles.get("h1").getState(cl));
 		assertFalse(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
-		assertNull(dsOpHandles.get("h2").getState());
+		assertNull(dsOpHandles.get("h2").getState(cl));
 
 		dsWrapper.discardState();
 
@@ -126,7 +127,7 @@ public class StateHandleTest {
 		}
 
 		@Override
-		public T getState() {
+		public T getState(ClassLoader userCodeClassLoader) {
 			return state;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 32b3455..c19c548 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -52,7 +52,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
@@ -170,9 +169,9 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 		}, context);
 
 		if (serializedState != null) {
+			ClassLoader cl = Thread.currentThread().getContextClassLoader();
 			op.restoreInitialState((Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>) InstantiationUtil
-					.deserializeObject(serializedState, Thread.currentThread()
-							.getContextClassLoader()));
+					.deserializeObject(serializedState, cl));
 		}
 
 		op.open(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index fbcda1c..8e9e56a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -73,8 +73,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 		}
 		try {
 			sync = true;
-			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
-			latestResult = result.toJobExecutionResult(getClass().getClassLoader());
+			latestResult = executor.submitJobAndWait(jobGraph, false);
 			return latestResult;
 		} catch (JobExecutionException e) {
 			if (e.getMessage().contains("GraphConversionException")) {
@@ -116,8 +115,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 			jobRunner = new Thread() {
 				public void run() {
 					try {
-						SerializedJobExecutionResult result = cluster.submitJobAndWait(jobGraph, false);
-						latestResult = result.toJobExecutionResult(getClass().getClassLoader());
+						latestResult = cluster.submitJobAndWait(jobGraph, false);
 					} catch (JobExecutionException e) {
 						// TODO remove: hack to make ITCase succeed because .submitJobAndWait() throws exception on .stop() (see this.shutdown())
 						latestResult = new JobExecutionResult(null, 0, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index d23469e..bd5400d 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import org.junit.Assert;
@@ -121,8 +120,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 			Assert.assertNotNull("Obtained null JobGraph", jobGraph);
 
 			try {
-				SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
-				this.jobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
+				this.jobExecutionResult = executor.submitJobAndWait(jobGraph, false);
 			}
 			catch (Exception e) {
 				System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 80df0f8..1812422 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Assert;
 
@@ -51,9 +50,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 			JobGraphGenerator jgg = new JobGraphGenerator();
 			JobGraph jobGraph = jgg.compileJobGraph(op);
 			
-			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
-
-			this.lastJobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
+			this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
 			return this.lastJobExecutionResult;
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 1488f00..f410827 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -387,6 +387,25 @@ under the License.
 							</descriptors>
 						</configuration>
 					</execution>
+					<execution>
+						<id>create-streaming-state-checkpointed-classloader-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>streaming-checkpointed-classloader</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-streaming-state-checkpointed-classloader-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/src/test/assembly/test-streaming-state-checkpointed-classloader-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-streaming-state-checkpointed-classloader-assembly.xml b/flink-tests/src/test/assembly/test-streaming-state-checkpointed-classloader-assembly.xml
new file mode 100644
index 0000000..e5682ae
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-streaming-state-checkpointed-classloader-assembly.xml
@@ -0,0 +1,39 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.class</include>
+				<include>org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram$*.class</include>
+				<include>org/apache/flink/test/testdata/WordCountData.class</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index f3c061c..e43a9e4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -27,7 +27,9 @@ import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class ClassLoaderITCase {
 
@@ -35,14 +37,25 @@ public class ClassLoaderITCase {
 
 	private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
 
+	private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
+
 	private static final String KMEANS_JAR_PATH = "target/kmeans-test-jar.jar";
 
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
 	@Test
-	public void testJobWithCustomInputFormat() {
+	public void testJobsWithCustomClassLoader() {
 		try {
+
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
+
+			// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
+			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, "file://" + folder.newFolder().getAbsolutePath());
 
 			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
 
@@ -57,10 +70,28 @@ public class ClassLoaderITCase {
 									} );
 				inputSplitTestProg.invokeInteractiveModeForExecution();
 
+				// regular streaming job
 				PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE),
 						new String[] { STREAMING_PROG_JAR_FILE, "localhost", String.valueOf(port) } );
 				streamingProg.invokeInteractiveModeForExecution();
 
+				// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+				// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+				try {
+					PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
+							new String[]{STREAMING_CHECKPOINTED_PROG_JAR_FILE, "localhost", String.valueOf(port)});
+					streamingCheckpointedProg.invokeInteractiveModeForExecution();
+				} catch(Exception e) {
+					// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+					try {
+						if (!(e.getCause().getCause().getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"))) {
+							throw e;
+						}
+					} catch(Throwable ignore) {
+						throw e;
+					}
+				}
+
 				PackagedProgram kMeansProg = new PackagedProgram(new File(KMEANS_JAR_PATH),
 						new String[] { KMEANS_JAR_PATH,
 										"localhost",
@@ -81,4 +112,5 @@ public class ClassLoaderITCase {
 			Assert.fail(e.getMessage());
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
new file mode 100644
index 0000000..47253da
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.lang.RuntimeException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * A simple streaming program, which is using the state checkpointing of Flink.
+ * It is using a user defined class as the state.
+ */
+@SuppressWarnings("serial")
+public class CheckpointedStreamingProgram {
+
+	private static final int CHECKPOINT_INTERVALL = 100;
+	
+	public static void main(String[] args) throws Exception {
+		ClassLoader cl = ClassLoader.getSystemClassLoader();
+		URL[] urls = ((URLClassLoader)cl).getURLs();
+
+		for(URL url: urls){
+			System.out.println(url.getFile());
+		}
+		System.out.println("CheckpointedStreamingProgram classpath: ");
+
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(CHECKPOINT_INTERVALL);
+		env.setNumberOfExecutionRetries(1);
+		env.disableOperatorChaining();
+		
+		DataStream<String> text = env.addSource(new SimpleStringGenerator());
+		text.map(new StatefulMapper()).addSink(new NoOpSink());
+		env.setParallelism(1);
+		env.execute("Checkpointed Streaming Program");
+	}
+
+
+	// with Checkpoining
+	public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+		public boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			while(running) {
+				Thread.sleep(1);
+				ctx.collect("someString");
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return null;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+
+		}
+	}
+
+	public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointNotifier {
+
+		private String someState;
+		private boolean atLeastOneSnapshotComplete = false;
+		private boolean restored = false;
+
+		@Override
+		public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return this;
+		}
+
+		@Override
+		public void restoreState(StatefulMapper state) {
+			restored = true;
+			this.someState = state.someState;
+			this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			if(!atLeastOneSnapshotComplete) {
+				// throttle consumption by the checkpoint interval until we have one snapshot.
+				Thread.sleep(CHECKPOINT_INTERVALL);
+			}
+			if(atLeastOneSnapshotComplete && !restored) {
+				throw new RuntimeException("Intended failure, to trigger restore");
+			}
+			if(restored) {
+				throw new SuccessException();
+				//throw new RuntimeException("All good");
+			}
+			someState = value; // update our state
+			return value;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			atLeastOneSnapshotComplete = true;
+		}
+	}
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * We intentionally use a user specified failure exception
+	 */
+	public static class SuccessException extends Exception {
+
+	}
+
+	public static class NoOpSink implements SinkFunction<String>{
+		@Override
+		public void invoke(String value) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf8c8e54/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 2cf5598..2bde833 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.Tasks;
@@ -48,7 +47,7 @@ public class JobSubmissionFailsITCase {
 	
 	private static final int NUM_SLOTS = 20;
 	
-	private static ForkableFlinkMiniCluster cluser;
+	private static ForkableFlinkMiniCluster cluster;
 	private static JobGraph workingJobGraph;
 
 	@BeforeClass
@@ -59,7 +58,7 @@ public class JobSubmissionFailsITCase {
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			
-			cluser = new ForkableFlinkMiniCluster(config);
+			cluster = new ForkableFlinkMiniCluster(config);
 			
 			final JobVertex jobVertex = new JobVertex("Working job vertex.");
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
@@ -74,7 +73,7 @@ public class JobSubmissionFailsITCase {
 	@AfterClass
 	public static void teardown() {
 		try {
-			cluser.shutdown();
+			cluster.shutdown();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -100,13 +99,11 @@ public class JobSubmissionFailsITCase {
 	
 	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
 		if (detached) {
-			cluser.submitJobDetached(jobGraph);
+			cluster.submitJobDetached(jobGraph);
 			return null;
 		}
 		else {
-			SerializedJobExecutionResult result = cluser.submitJobAndWait(
-												jobGraph, false, TestingUtils.TESTING_DURATION());
-			return result.toJobExecutionResult(getClass().getClassLoader());
+			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
 		}
 	}
 
@@ -130,7 +127,7 @@ public class JobSubmissionFailsITCase {
 				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
 
-			cluser.submitJobAndWait(workingJobGraph, false);
+			cluster.submitJobAndWait(workingJobGraph, false);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -155,7 +152,7 @@ public class JobSubmissionFailsITCase {
 				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
 	
-			cluser.submitJobAndWait(workingJobGraph, false);
+			cluster.submitJobAndWait(workingJobGraph, false);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -178,7 +175,7 @@ public class JobSubmissionFailsITCase {
 				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
 
-			cluser.submitJobAndWait(workingJobGraph, false);
+			cluster.submitJobAndWait(workingJobGraph, false);
 		}
 		catch (Exception e) {
 			e.printStackTrace();