You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/04/01 18:10:03 UTC

flink git commit: [FLINK-3633] Fix user code de/serialization in ExecutionConfig

Repository: flink
Updated Branches:
  refs/heads/master 82b5824b9 -> 0331882af


[FLINK-3633] Fix user code de/serialization in ExecutionConfig

FLINK-3327 moved the ExecutionConfig directly to the JobGraph so that it was serialized
and deserialized using the system class loader when sending a SubmitJob message to the
JobManager. This is problematic since the ExecutionConfig can contain user code class
which require a user code class loader for deserialization. In order to circumvent the
problem, a UserCodeValue class was introduced which automatically sends the wrapped value
as a byte array. On the receiving side, the wrapped value has to be explicitly deserialized
providing a class loader.

To test the feature the ScalaShellITCase.testSubmissionOfExternalLibrary was adapted
to register org.apache.flink.ml.math.Vector at the ExecutionConfig.

This commit also re-introduces the removed ExecutionConfig.CONFIG_KEY key, so that
version 1.1 does not break the API.

Updating documentation to include new task.cancellation-interval parameter

Make globalJobParameters a UserCodeValue, fix wrong exception message, fix formatting regression

Rework serialization of user code object in ExecutionConfig

Due to the tight coupling of the ExecutionConfig and multiple Flink components
(e.g. PojoSerializer) the automatic serialization and manual deserialization of
user code objects via the UserCodeValue class caused problems. In order to minimize
the impact of the changes, I changed the serialization strategy to an explicit one.
One has to call serializeUserCode to store the user code objects in a SerializeValue
object and nulling corresponding member fields. If that is not done, then it is
assumed that the object is deserialized using a user code class loader.

Add test case for user code types in ExecutionConfig

This closes #1818.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0331882a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0331882a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0331882a

Branch: refs/heads/master
Commit: 0331882af124a1750a54b4c6a9bd13819d65f72e
Parents: 82b5824
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Mar 18 16:22:04 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 1 18:09:39 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  31 ++---
 .../flink/api/common/ExecutionConfig.java       | 121 ++++++++++++++++---
 .../flink/configuration/ConfigConstants.java    |   8 +-
 .../plantranslate/JobGraphGenerator.java        |  12 +-
 .../deployment/TaskDeploymentDescriptor.java    |  48 +++++---
 .../runtime/executiongraph/ExecutionGraph.java  |  13 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  23 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  16 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   2 +
 .../flink/api/scala/ScalaShellITCase.scala      |   2 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  15 +--
 flink-tests/pom.xml                             |  19 +++
 .../assembly/test-usercodetype-assembly.xml     |  37 ++++++
 .../test/classloading/ClassLoaderITCase.java    |  12 ++
 .../test/classloading/jar/UserCodeType.java     |  76 ++++++++++++
 16 files changed, 367 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index fdc90a2..1c5735d 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -153,23 +153,24 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 
 The following parameters configure Flink's JobManager and TaskManagers.
 
-- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).
-- `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
+- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**).
+- `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: **6123**).
 - `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.
-- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: 6122).
-- `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: 6121).
-- `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager (DEFAULT: 256).
-- `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512). On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.
-- `taskmanager.numberOfTaskSlots`: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
-- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir).
-- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048).
-- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: -1)
-- `taskmanager.memory.fraction`: The relative amount of memory that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that TaskManagers reserve 80% of the JVM's heap space for internal data buffers, leaving 20% of the JVM's heap space free for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
+- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **6122**).
+- `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: **6121**).
+- `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager (DEFAULT: **256**).
+- `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: **512**). On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.
+- `taskmanager.numberOfTaskSlots`: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: **1**). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
+- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: **The system's tmp dir**).
+- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**).
+- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: **-1**)
+- `taskmanager.memory.fraction`: The relative amount of memory that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that TaskManagers reserve 80% of the JVM's heap space for internal data buffers, leaving 20% of the JVM's heap space free for objects created by user-defined functions. (DEFAULT: **0.7**) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 - `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.
 - `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-- `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: 50).
-- `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: 50).
-- `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: 1000).
+- `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
+- `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**).
+- `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: **1000**).
+- `task.cancellation-interval`: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: **30000**).
 
 
 ### Distributed Coordination (via Akka)
@@ -185,7 +186,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
 - `akka.transport.threshold`: Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT: **300**).
 - `akka.tcp.timeout`: Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: **akka.ask.timeout**).
 - `akka.throughput`: Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: **15**).
-- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **off**).
+- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'true' in case of debugging (DEFAULT: **false**).
 - `akka.startup-timeout`: Timeout after which the startup of a remote component is considered being failed (DEFAULT: **akka.ask.timeout**).
 
 ### JobManager Web Frontend

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 205593c..9225cdd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,9 +22,10 @@ import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.util.SerializedValue;
 
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -61,6 +62,9 @@ public class ExecutionConfig implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
+	// Key for storing it in the Job Configuration
+	public static final String CONFIG_KEY = "runtime.config";
+
 	/**
 	 * The constant to use for the parallelism, if the system should use the number
 	 *  of currently available slots.
@@ -97,8 +101,6 @@ public class ExecutionConfig implements Serializable {
 	/** If set to true, progress updates are printed to System.out during execution */
 	private boolean printProgressDuringExecution = true;
 
-	private GlobalJobParameters globalJobParameters;
-
 	private long autoWatermarkInterval = 0;
 
 	/**
@@ -109,22 +111,42 @@ public class ExecutionConfig implements Serializable {
 
 	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
 	
-	private long taskCancellationIntervalMillis = ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
+	private long taskCancellationIntervalMillis = -1;
+
+	// ------------------------------- User code values --------------------------------------------
+
+	private transient GlobalJobParameters globalJobParameters;
 
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
 
-	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
+	private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
+
+	private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
+
+	private LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
+
+	private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
+
+	private LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
+
+	private LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
+
+	// ----------------------- Helper values for serialized user objects ---------------------------
+
+	private SerializedValue<GlobalJobParameters> serializedGlobalJobParameters;
+
+	private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedRegisteredTypesWithKryoSerializers;
 
-	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
+	private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedRegisteredTypesWithKryoSerializerClasses;
 
-	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
+	private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedDefaultKryoSerializers;
 
-	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
+	private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedDefaultKryoSerializerClasses;
 
-	private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
+	private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredKryoTypes;
 
-	private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
+	private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredPojoTypes;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -230,11 +252,6 @@ public class ExecutionConfig implements Serializable {
 	 * @param interval the interval (in milliseconds).
 	 */
 	public ExecutionConfig setTaskCancellationInterval(long interval) {
-		if(interval < 0) {
-			throw new IllegalArgumentException(
-				"The task cancellation interval cannot be negative."
-			);
-		}
 		this.taskCancellationIntervalMillis = interval;
 		return this;
 	}
@@ -662,6 +679,79 @@ public class ExecutionConfig implements Serializable {
 		this.autoTypeRegistrationEnabled = false;
 	}
 
+	/**
+	 * Deserializes user code objects given a user code class loader
+	 *
+	 * @param userCodeClassLoader User code class loader
+	 * @throws IOException Thrown if an IOException occurs while loading the classes
+	 * @throws ClassNotFoundException Thrown if the given class cannot be loaded
+	 */
+	public void deserializeUserCode(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
+		if (serializedRegisteredKryoTypes != null) {
+			registeredKryoTypes = serializedRegisteredKryoTypes.deserializeValue(userCodeClassLoader);
+		} else {
+			registeredKryoTypes = new LinkedHashSet<>();
+		}
+
+		if (serializedRegisteredPojoTypes != null) {
+			registeredPojoTypes = serializedRegisteredPojoTypes.deserializeValue(userCodeClassLoader);
+		} else {
+			registeredPojoTypes = new LinkedHashSet<>();
+		}
+
+		if (serializedRegisteredTypesWithKryoSerializerClasses != null) {
+			registeredTypesWithKryoSerializerClasses = serializedRegisteredTypesWithKryoSerializerClasses.deserializeValue(userCodeClassLoader);
+		} else {
+			registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
+		}
+
+		if (serializedRegisteredTypesWithKryoSerializers != null) {
+			registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
+		} else {
+			registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
+		}
+
+		if (serializedDefaultKryoSerializers != null) {
+			defaultKryoSerializers = serializedDefaultKryoSerializers.deserializeValue(userCodeClassLoader);
+		} else {
+			defaultKryoSerializers = new LinkedHashMap<>();
+
+		}
+
+		if (serializedDefaultKryoSerializerClasses != null) {
+			defaultKryoSerializerClasses = serializedDefaultKryoSerializerClasses.deserializeValue(userCodeClassLoader);
+		} else {
+			defaultKryoSerializerClasses = new LinkedHashMap<>();
+		}
+
+		if (serializedGlobalJobParameters != null) {
+			globalJobParameters = serializedGlobalJobParameters.deserializeValue(userCodeClassLoader);
+		}
+	}
+
+	public void serializeUserCode() throws IOException {
+		serializedRegisteredKryoTypes = new SerializedValue<>(registeredKryoTypes);
+		registeredKryoTypes = null;
+
+		serializedRegisteredPojoTypes = new SerializedValue<>(registeredPojoTypes);
+		registeredPojoTypes = null;
+
+		serializedRegisteredTypesWithKryoSerializerClasses = new SerializedValue<>(registeredTypesWithKryoSerializerClasses);
+		registeredTypesWithKryoSerializerClasses = null;
+
+		serializedRegisteredTypesWithKryoSerializers = new SerializedValue<>(registeredTypesWithKryoSerializers);
+		registeredTypesWithKryoSerializers = null;
+
+		serializedDefaultKryoSerializers = new SerializedValue<>(defaultKryoSerializers);
+		defaultKryoSerializers = null;
+
+		serializedDefaultKryoSerializerClasses = new SerializedValue<>(defaultKryoSerializerClasses);
+		defaultKryoSerializerClasses = null;
+
+		serializedGlobalJobParameters = new SerializedValue<>(globalJobParameters);
+		globalJobParameters = null;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {
@@ -754,5 +844,4 @@ public class ExecutionConfig implements Serializable {
 			return null;
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 755ea7e..ba2d880 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -220,7 +220,13 @@ public final class ConfigConstants {
 	 */
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
 
-	// ------------------------ Runtime Algorithms ----------------------------
+	/**
+	 * Time interval between two successive task cancellation attempts in milliseconds.
+	 */
+	@PublicEvolving
+	public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval";
+
+	// --------------------------- Runtime Algorithms -------------------------------
 	
 	/**
 	 * Parameter for the maximum fan for out-of-core algorithms.

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index c7aaa7d..696a05d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -212,9 +213,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		attachOperatorNamesAndDescriptions();
 
 		// ----------- finalize the job graph -----------
-		
+
 		// create the job graph object
 		JobGraph graph = new JobGraph(jobId, program.getJobName(), program.getOriginalPlan().getExecutionConfig());
+
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 
@@ -240,6 +242,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		this.auxVertices = null;
 		this.iterations = null;
 		this.iterationStack = null;
+
+		try {
+			// make sure that we can send the ExecutionConfig using the system class loader
+			graph.getExecutionConfig().serializeUserCode();
+		} catch (IOException e) {
+			throw new CompilerException("Could not serialize the user code object in the " +
+				"ExecutionConfig.", e);
+		}
 		
 		// return job graph
 		return graph;

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 60b8ba6..73c37b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -98,13 +98,23 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * Constructs a task deployment descriptor.
 	 */
 	public TaskDeploymentDescriptor(
-			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
-			ExecutionConfig executionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks,
-			int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration,
-			String invokableClassName, List<ResultPartitionDeploymentDescriptor> producedPartitions,
+			JobID jobID,
+			JobVertexID vertexID,
+			ExecutionAttemptID executionId,
+			ExecutionConfig executionConfig,
+			String taskName,
+			int indexInSubtaskGroup,
+			int numberOfSubtasks,
+			int attemptNumber,
+			Configuration jobConfiguration,
+			Configuration taskConfiguration,
+			String invokableClassName,
+			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
-			int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState,
+			List<BlobKey> requiredJarFiles,
+			List<URL> requiredClasspaths,
+			int targetSlotNumber,
+			SerializedValue<StateHandle<?>> operatorState,
 			long recoveryTimestamp) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
@@ -133,13 +143,22 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	}
 
 	public TaskDeploymentDescriptor(
-			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
-			ExecutionConfig executionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks,
-			int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration,
-			String invokableClassName, List<ResultPartitionDeploymentDescriptor> producedPartitions,
-			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
-			int targetSlotNumber) {
+		JobID jobID,
+		JobVertexID vertexID,
+		ExecutionAttemptID executionId,
+		ExecutionConfig executionConfig,
+		String taskName,
+		int indexInSubtaskGroup,
+		int numberOfSubtasks,
+		int attemptNumber,
+		Configuration jobConfiguration,
+		Configuration taskConfiguration,
+		String invokableClassName,
+		List<ResultPartitionDeploymentDescriptor> producedPartitions,
+		List<InputGateDeploymentDescriptor> inputGates,
+		List<BlobKey> requiredJarFiles,
+		List<URL> requiredClasspaths,
+		int targetSlotNumber) {
 
 		this(jobID, vertexID, executionId, executionConfig, taskName, indexInSubtaskGroup,
 				numberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClassName,
@@ -147,7 +166,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	}
 
 	/**
-	 * Returns the execution configuration (see {@link ExecutionConfig}) related to the specific job.
+	 * Returns the execution configuration (see {@link ExecutionConfig}) related to the
+	 * specific job.
 	 */
 	public ExecutionConfig getExecutionConfig() {
 		return executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 026f6b2..3c20647 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -237,7 +236,6 @@ public class ExecutionGraph implements Serializable {
 	private ExecutionContext executionContext;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
-
 	private String jsonPlan;
 
 	// --------------------------------------------------------------------------------------------
@@ -307,7 +305,9 @@ public class ExecutionGraph implements Serializable {
 
 		this.requiredJarFiles = requiredJarFiles;
 		this.requiredClasspaths = requiredClasspaths;
-		this.executionConfig = Preconditions.checkNotNull(config);
+
+		this.executionConfig = checkNotNull(config);
+
 		this.timeout = timeout;
 
 		this.restartStrategy = restartStrategy;
@@ -967,8 +967,13 @@ public class ExecutionGraph implements Serializable {
 		isArchived = true;
 	}
 
+	/**
+	 * Returns the {@link ExecutionConfig}.
+	 *
+	 * @return ExecutionConfig
+	 */
 	public ExecutionConfig getExecutionConfig() {
-		return this.executionConfig;
+		return executionConfig;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index ed714a4..b7c6551 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
@@ -35,13 +34,13 @@ import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.Set;
-import java.util.LinkedHashSet;
-import java.util.Iterator;
 
 /**
  * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
@@ -80,8 +79,6 @@ public class JobGraph implements Serializable {
 	/** Name of this job. */
 	private final String jobName;
 
-	private final ExecutionConfig executionConfig;
-
 	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 	 * job manager after it has been executed. */
 	private long sessionTimeout = 0;
@@ -96,7 +93,10 @@ public class JobGraph implements Serializable {
 	private JobSnapshottingSettings snapshotSettings;
 
 	/** List of classpaths required to run this job. */
-	private List<URL> classpaths = Collections.<URL>emptyList();
+	private List<URL> classpaths = Collections.emptyList();
+
+	/** Job specific execution config */
+	private ExecutionConfig executionConfig;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -107,7 +107,7 @@ public class JobGraph implements Serializable {
 	 * @param config The {@link ExecutionConfig} for the job.
 	 */
 	public JobGraph(ExecutionConfig config) {
-		this((String) null, config);
+		this(null, config);
 	}
 
 	/**
@@ -132,7 +132,7 @@ public class JobGraph implements Serializable {
 	public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
 		this.jobID = jobId == null ? new JobID() : jobId;
 		this.jobName = jobName == null ? "(unnamed job)" : jobName;
-		this.executionConfig = Preconditions.checkNotNull(config);
+		this.executionConfig = config == null ? new ExecutionConfig() : config;
 	}
 
 	/**
@@ -205,8 +205,13 @@ public class JobGraph implements Serializable {
 		return this.jobConfiguration;
 	}
 
+	/**
+	 * Returns the {@link ExecutionConfig}
+	 *
+	 * @return ExecutionConfig
+	 */
 	public ExecutionConfig getExecutionConfig() {
-		return this.executionConfig;
+		return executionConfig;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f3beae3..b401fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -221,6 +222,9 @@ public class Task implements Runnable {
 	/** The job specific execution configuration (see {@link ExecutionConfig}). */
 	private final ExecutionConfig executionConfig;
 
+	/** Interval between two successive task cancellation attempts */
+	private final long taskCancellationInterval;
+
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
 	 * be undone in the case of a failing task deployment.</p>
@@ -267,6 +271,15 @@ public class Task implements Runnable {
 
 		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
 
+		if (executionConfig.getTaskCancellationInterval() < 0) {
+			taskCancellationInterval = jobConfiguration.getLong(
+				ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+				ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+		} else {
+			taskCancellationInterval = executionConfig.getTaskCancellationInterval();
+		}
+
+
 		// create the reader and writer structures
 
 		final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
@@ -456,6 +469,8 @@ public class Task implements Runnable {
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
 			final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache);
 
+			executionConfig.deserializeUserCode(userCodeClassLoader);
+
 			// now load the task's invokable code
 			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 
@@ -836,7 +851,6 @@ public class Task implements Runnable {
 						// because the canceling may block on user code, we cancel from a separate thread
 						// we do not reuse the async call handler, because that one may be blocked, in which
 						// case the canceling could not continue
-						long taskCancellationInterval = this.executionConfig.getTaskCancellationInterval();
 						Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask,
 							taskCancellationInterval);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b487e30..baffa2a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1071,7 +1071,7 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy = Option(jobGraph.getExecutionConfig.getRestartStrategy())
+        val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy())
           .map(RestartStrategyFactory.createRestartStrategy(_)) match {
             case Some(strategy) => strategy
             case None => defaultRestartStrategy

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e839c97..63e62bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -78,6 +78,8 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
+			// load serialized values in ExecutionConfig
+			copy.getExecutionConfig().deserializeUserCode(getClass().getClassLoader());
 			assertEquals(orig.getExecutionConfig(), copy.getExecutionConfig());
 
 			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 683ec63..37e7b51 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -150,7 +150,7 @@ class ScalaShellITCase extends TestLogger {
     val input =
       """
         import org.apache.flink.ml.math._
-        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
+        val denseVectors = env.fromElements[Vector](DenseVector(1.0, 2.0, 3.0))
         denseVectors.print()
       """.stripMargin
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c339a07..c3b515e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -55,6 +55,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -128,7 +129,12 @@ public class StreamingJobGraphGenerator {
 		
 		configureCheckpointing();
 
-		configureRestartStrategy();
+		try {
+			// make sure that we can send the ExecutionConfig without user code object problems
+			jobGraph.getExecutionConfig().serializeUserCode();
+		} catch (IOException e) {
+			throw new IllegalStateException("Could not serialize ExecutionConfig.", e);
+		}
 
 		return jobGraph;
 	}
@@ -477,18 +483,13 @@ public class StreamingJobGraphGenerator {
 
 			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
 			if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
-				// if the user enabled checkpointing, the default number of exec retries is infinitive.
+				// if the user enabled checkpointing, the default number of exec retries is infinite.
 				streamGraph.getExecutionConfig().setRestartStrategy(
 					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
 			}
 		}
 	}
 
-	private void configureRestartStrategy() {
-		jobGraph.getExecutionConfig().setRestartStrategy(
-			streamGraph.getExecutionConfig().getRestartStrategy());
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 34b3d66..ee7d0a3 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -371,6 +371,25 @@ under the License.
 						</configuration>
 					</execution>
 					<execution>
+						<id>create-usercodetype-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.UserCodeType</mainClass>
+								</manifest>
+							</archive>
+							<finalName>usercodetype</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-usercodetype-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+					<execution>
 						<id>create-custominputsplit-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-tests/src/test/assembly/test-usercodetype-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-usercodetype-assembly.xml b/flink-tests/src/test/assembly/test-usercodetype-assembly.xml
new file mode 100644
index 0000000..54b9b67
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-usercodetype-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/UserCodeType.class</include>
+				<include>org/apache/flink/test/classloading/jar/UserCodeType$*.class</include>
+				<include>org.apache.flink.test.classloading.jar.UserCodeType$CustomType</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 9d0d637..04abdf8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -45,6 +45,8 @@ public class ClassLoaderITCase {
 
 	private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
 
+	private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
+
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
 
@@ -135,6 +137,16 @@ public class ClassLoaderITCase {
 										"25"
 									});
 				kMeansProg.invokeInteractiveModeForExecution();
+
+				// test FLINK-3633
+				PackagedProgram userCodeTypeProg = new PackagedProgram(
+					new File(USERCODETYPE_JAR_PATH),
+					new String[] { USERCODETYPE_JAR_PATH,
+						"localhost",
+						String.valueOf(port),
+					});
+
+				userCodeTypeProg.invokeInteractiveModeForExecution();
 			}
 			finally {
 				testCluster.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/0331882a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
new file mode 100644
index 0000000..333c01a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+
+/**
+ * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
+ *
+ * This class is used to test FLINK-3633
+ */
+public class UserCodeType {
+	public static class CustomType {
+		private final int value;
+
+		public CustomType(int value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return "CustomType(" + value + ")";
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		String jarFile = args[0];
+		String host = args[1];
+		int port = Integer.parseInt(args[2]);
+
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+
+		DataSet<Integer> input = env.fromElements(1,2,3,4,5);
+
+		DataSet<CustomType> customTypes = input.map(new MapFunction<Integer, CustomType>() {
+			private static final long serialVersionUID = -5878758010124912128L;
+
+			@Override
+			public CustomType map(Integer integer) throws Exception {
+				return new CustomType(integer);
+			}
+		}).rebalance();
+
+		DataSet<Integer> result = customTypes.map(new MapFunction<CustomType, Integer>() {
+			private static final long serialVersionUID = -7950126399899584991L;
+
+			@Override
+			public Integer map(CustomType value) throws Exception {
+				return value.value;
+			}
+		});
+
+		result.output(new DiscardingOutputFormat<Integer>());
+
+		env.execute();
+	}
+}