You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/15 14:50:35 UTC

[1/4] flink git commit: [FLINK-2373] Add configuration parameter to createRemoteEnvironment method

Repository: flink
Updated Branches:
  refs/heads/master 8a75025ab -> 2cc5d98c6


[FLINK-2373] Add configuration parameter to createRemoteEnvironment method

This closes #1066


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e78b80c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e78b80c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e78b80c4

Branch: refs/heads/master
Commit: e78b80c4e4a99385ae5ef5a9afc9f8a64533a9e3
Parents: 8a75025
Author: Andreas Kunft <an...@tu-berlin.de>
Authored: Tue Jul 28 16:58:32 2015 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:19:04 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |  41 +++--
 .../RemoteExecutorHostnameResolutionTest.java   |   3 +-
 .../apache/flink/api/common/PlanExecutor.java   |   8 +-
 .../flink/api/java/ExecutionEnvironment.java    |  22 +++
 .../flink/api/java/RemoteEnvironment.java       |  11 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  33 +++-
 .../api/java/ScalaShellRemoteEnvironment.java   |   3 +-
 .../ExecutionEnvironmentITCase.java             |   4 +-
 .../RemoteEnvironmentITCase.java                | 162 +++++++++++++++++++
 9 files changed, 263 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index d1be6d2..20169f6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -54,30 +54,47 @@ public class RemoteExecutor extends PlanExecutor {
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
 
 	private final List<String> jarFiles;
-	private final Configuration configuration;
+
+	private final Configuration clientConfiguration;
 	
 	public RemoteExecutor(String hostname, int port) {
-		this(hostname, port, Collections.<String>emptyList());
+		this(hostname, port, Collections.<String>emptyList(), new Configuration());
 	}
 	
 	public RemoteExecutor(String hostname, int port, String jarFile) {
-		this(hostname, port, Collections.singletonList(jarFile));
+		this(hostname, port, Collections.singletonList(jarFile), new Configuration());
 	}
 	
 	public RemoteExecutor(String hostport, String jarFile) {
-		this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
+		this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
 	}
 	
 	public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
-		this(new InetSocketAddress(hostname, port), jarFiles);
+		this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
+	}
+
+	public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
+		this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
+	}
+
+	public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
+		this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
+	}
+
+	public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
+		this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
+	}
+
+	public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
+		this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
 	}
 
-	public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
+	public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
 		this.jarFiles = jarFiles;
-		configuration = new Configuration();
+		this.clientConfiguration = clientConfiguration;
 
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
+		clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
+		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
 	}
 
 	@Override
@@ -87,7 +104,7 @@ public class RemoteExecutor extends PlanExecutor {
 	}
 	
 	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
+		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
 		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 		
 		JobSubmissionResult result = c.run(p, -1, true);
@@ -103,7 +120,7 @@ public class RemoteExecutor extends PlanExecutor {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		
-		Client c = new Client(configuration, program.getUserCodeClassLoader(), -1);
+		Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
 		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 		
 		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
@@ -118,7 +135,7 @@ public class RemoteExecutor extends PlanExecutor {
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
+		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
 		
 		OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 47236af..7f67567 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.junit.Test;
 
@@ -67,7 +68,7 @@ public class RemoteExecutorHostnameResolutionTest {
 		
 		try {
 			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
-			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
+			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 74bdb09..1294011 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -105,11 +105,12 @@ public abstract class PlanExecutor {
 	 * 
 	 * @param hostname The address of the JobManager to send the program to.
 	 * @param port The port of the JobManager to send the program to.
+	 * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
 	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
 	 *                 from within the UDFs.
 	 * @return A remote executor.
 	 */
-	public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
+	public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
 		if (hostname == null) {
 			throw new IllegalArgumentException("The hostname must not be null.");
 		}
@@ -123,7 +124,10 @@ public abstract class PlanExecutor {
 																		: Arrays.asList(jarFiles); 
 		
 		try {
-			return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
+			PlanExecutor executor = (clientConfiguration == null) ?
+					reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
+					reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
+			return executor;
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("An error occurred while loading the remote executor ("

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 32049c4..297982c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1106,6 +1106,28 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
+	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the
+	 * cluster. The custom configuration file is used to configure Akka specific configuration parameters
+	 * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
+	 *
+	 * Cluster configuration has to be done in the remotely running Flink instance.
+	 *
+	 * @param host The host name or address of the master (JobManager), where the program should be executed.
+	 * @param port The port of the master (JobManager), where the program should be executed.
+	 * @param clientConfiguration Pass a custom configuration to the Client.
+	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
+	 *                 provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
+	 */
+	public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
+		RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
+		rec.setClientConfiguration(clientConfiguration);
+		return rec;
+	}
+
+	/**
 	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 
 	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the
 	 * cluster. The execution will use the specified parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 6f84077..1f73e93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * An {@link ExecutionEnvironment} that sends programs 
@@ -35,6 +36,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	protected final int port;
 	
 	private final String[] jarFiles;
+
+	private Configuration clientConfiguration;
 	
 	/**
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
@@ -65,7 +68,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 		
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 
 		this.lastJobExecutionResult = executor.executePlan(p);
@@ -78,7 +81,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		p.setDefaultParallelism(getParallelism());
 		registerCachedFilesWithPlan(p);
 		
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
 		return executor.getOptimizerPlanAsJSON(p);
 	}
 
@@ -87,4 +90,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
+
+	public void setClientConfiguration(Configuration clientConfiguration) {
+		this.clientConfiguration = clientConfiguration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cdf7211..39bb717 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -134,7 +134,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getId: UUID = {
     javaEnv.getId
   }
-  
+
   /**
    * Gets the JobExecutionResult of the last executed job.
    */
@@ -181,13 +181,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
     javaEnv.addDefaultKryoSerializer(clazz, serializer)
   }
-  
+
   /**
    * Registers the given type with the serialization stack. If the type is eventually
    * serialized as a POJO, then the type is registered with the POJO serializer. If the
    * type ends up being serialized with Kryo, then it will be registered at Kryo to make
    * sure that only tags are written.
-   * 
+   *
    */
   def registerType(typeClass: Class[_]) {
     javaEnv.registerType(typeClass)
@@ -707,5 +707,32 @@ object ExecutionEnvironment {
     javaEnv.setParallelism(parallelism)
     new ExecutionEnvironment(javaEnv)
   }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program must be accessible
+   * from the cluster. The custom configuration file is used to configure Akka specific
+   * configuration parameters for the Client only; Program parallelism can be set via
+   * [[ExecutionEnvironment.setParallelism]].
+   *
+   * Cluster configuration has to be done in the remotely running Flink instance.
+   *
+   * @param host The host name or address of the master (JobManager), where the program should be
+   *             executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param clientConfiguration Pass a custom configuration to the Client.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses user-defined functions, user-defined input formats, or any
+   *                 libraries, those must be provided in the JAR files.
+   * @return A remote environment that executes the program on a cluster.
+   */
+  def createRemoteEnvironment(
+      host: String,
+      port: Int,
+      clientConfiguration: Configuration,
+      jarFiles: String*): ExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, clientConfiguration, jarFiles: _*)
+    new ExecutionEnvironment(javaEnv)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 54af5bc..a7dc708 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 
 import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.configuration.Configuration;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -78,7 +79,7 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 		alljars.add(jarFile);
 		String[] alljarsArr = new String[alljars.size()];
 		alljarsArr = alljars.toArray(alljarsArr);
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);
 
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index 109af1e..f4b3875 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -80,9 +80,7 @@ public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
 						out.collect(getRuntimeContext().getIndexOfThisSubtask());
 					}
 				});
-		List<Integer> resultCollection = new ArrayList<Integer>();
-		result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
-		env.execute();
+		List<Integer> resultCollection = result.collect();
 		assertEquals(PARALLELISM, resultCollection.size());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
new file mode 100644
index 0000000..68b099d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+	private static final int TM_SLOTS = 4;
+
+	private static final int NUM_TM = 1;
+
+	private static final int USER_DOP = 2;
+
+	private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+	private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setupCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Error starting test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+	/**
+	 * Ensure that that Akka configuration parameters can be set.
+	 */
+	@Test(expected=IllegalArgumentException.class)
+	public void testInvalidAkkaConfiguration() throws Throwable {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				cluster.hostname(),
+				cluster.getLeaderRPCPort(),
+				config
+		);
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
+		result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
+		try {
+			env.execute();
+			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
+		} catch (ProgramInvocationException ex) {
+			throw ex.getCause();
+		}
+	}
+
+	/**
+	 * Ensure that the program parallelism can be set even if the configuration is supplied.
+	 */
+	@Test
+	public void testUserSpecificParallelism() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				cluster.hostname(),
+				cluster.getLeaderRPCPort(),
+				config
+		);
+		env.setParallelism(USER_DOP);
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
+				.rebalance()
+				.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
+					@Override
+					public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+						out.collect(getRuntimeContext().getIndexOfThisSubtask());
+					}
+				});
+		List<Integer> resultCollection = result.collect();
+		assertEquals(USER_DOP, resultCollection.size());
+	}
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(USER_DOP, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+}


[2/4] flink git commit: [FLINK-2655] Minimize intermediate merging of external merge sort.

Posted by fh...@apache.org.
[FLINK-2655] Minimize intermediate merging of external merge sort.

This closes #1118


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a75dd627
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a75dd627
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a75dd627

Branch: refs/heads/master
Commit: a75dd6270fd47720ae1c2ce9464ff6b8f7b43d39
Parents: e78b80c
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 10:16:35 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:01 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/UnilateralSortMerger.java    | 47 +++++++++++---------
 1 file changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a75dd627/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index fd1062d..32fbb52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1513,11 +1513,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		}
 
 		/**
-		 * Merges the given sorted runs to a smaller number of sorted runs. 
-		 * 
+		 * Merges the given sorted runs to a smaller number of sorted runs.
+		 *
 		 * @param channelIDs The IDs of the sorted runs that need to be merged.
+		 * @param allReadBuffers
 		 * @param writeBuffers The buffers to be used by the writers.
-
 		 * @return A list of the IDs of the merged channels.
 		 * @throws IOException Thrown, if the readers or writers encountered an I/O problem.
 		 */
@@ -1525,34 +1525,41 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 					final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers)
 		throws IOException
 		{
-			final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn));
-			final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges);
-			
+			// A channel list with length maxFanIn<sup>i</sup> can be merged to maxFanIn files in i-1 rounds where every merge
+			// is a full merge with maxFanIn input channels. A partial round includes merges with fewer than maxFanIn
+			// inputs. It is most efficient to perform the partial round first.
+			final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
+
+			final int numStart = channelIDs.size();
+			final int numEnd = (int) Math.pow(this.maxFanIn, scale);
+
+			final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (this.maxFanIn - 1));
+
+			final int numNotMerged = numEnd - numMerges;
+			final int numToMerge = numStart - numNotMerged;
+
+			// unmerged channel IDs are copied directly to the result list
+			final List<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>(numEnd);
+			mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
+
+			final int channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
+
 			// allocate the memory for the merging step
 			final List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
 			getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
-			
-			// the list containing the IDs of the merged channels
-			final ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int) (numMerges + 1));
 
-			final ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
-			int channelNum = 0;
+			final List<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
+			int channelNum = numNotMerged;
 			while (isRunning() && channelNum < channelIDs.size()) {
 				channelsToMergeThisStep.clear();
 
 				for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); i++, channelNum++) {
 					channelsToMergeThisStep.add(channelIDs.get(channelNum));
 				}
-				
-				// merge only, if there is more than one channel
-				if (channelsToMergeThisStep.size() < 2)  {
-					mergedChannelIDs.addAll(channelsToMergeThisStep);
-				}
-				else {
-					mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
-				}
+
+				mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
 			}
-			
+
 			return mergedChannelIDs;
 		}
 


[3/4] flink git commit: [FLINK-2533] [java-api] Gap based random sample optimization.

Posted by fh...@apache.org.
[FLINK-2533] [java-api] Gap based random sample optimization.

This closes #1110


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c923fb3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c923fb3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c923fb3c

Branch: refs/heads/master
Commit: c923fb3c1c1d61462e1079198ae9fb735bb0acf2
Parents: a75dd62
Author: gallenvara <ga...@126.com>
Authored: Mon Sep 7 14:55:11 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:41 2015 +0200

----------------------------------------------------------------------
 .../api/java/sampling/BernoulliSampler.java     | 36 ++++++++--
 .../flink/api/java/sampling/PoissonSampler.java | 74 +++++++++++++++-----
 .../flink/api/java/sampling/RandomSampler.java  |  2 +
 3 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index 0f5ecc6..99ea5de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -28,12 +28,16 @@ import java.util.Random;
  * Bernoulli experiment.
  *
  * @param <T> The type of sample.
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
  */
 public class BernoulliSampler<T> extends RandomSampler<T> {
 	
 	private final double fraction;
 	private final Random random;
 	
+	// THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+	private final static double THRESHOLD = 0.33;
+	
 	/**
 	 * Create a Bernoulli sampler with sample fraction and default random number generator.
 	 *
@@ -102,15 +106,35 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 			}
 
 			private T getNextSampledElement() {
-				while (input.hasNext()) {
-					T element = input.next();
+				if (fraction <= THRESHOLD) {
+					double rand = random.nextDouble();
+					double u = Math.max(rand, EPSILON);
+					int gap = (int) (Math.log(u) / Math.log(1 - fraction));
+					int elementCount = 0;
+					if (input.hasNext()) {
+						T element = input.next();
+						while (input.hasNext() && elementCount < gap) {
+							element = input.next();
+							elementCount++;
+						}
+						if (elementCount < gap) {
+							return null;
+						} else {
+							return element;
+						}
+					} else {
+						return null;
+					}
+				} else {
+					while (input.hasNext()) {
+						T element = input.next();
 
-					if (random.nextDouble() <= fraction) {
-						return element;
+						if (random.nextDouble() <= fraction) {
+							return element;
+						}
 					}
+					return null;
 				}
-
-				return null;
 			}
 		};
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 3834d24..8701167 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.math3.distribution.PoissonDistribution;
 
 import java.util.Iterator;
+import java.util.Random;
 
 /**
  * A sampler implementation based on the Poisson Distribution. While sampling elements with fraction
@@ -28,11 +29,16 @@ import java.util.Iterator;
  *
  * @param <T> The type of sample.
  * @see <a href="https://en.wikipedia.org/wiki/Poisson_distribution">https://en.wikipedia.org/wiki/Poisson_distribution</a>
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
  */
 public class PoissonSampler<T> extends RandomSampler<T> {
 	
 	private PoissonDistribution poissonDistribution;
 	private final double fraction;
+	private final Random random;
+	
+	// THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+	private final static double THRESHOLD = 0.4;
 	
 	/**
 	 * Create a poisson sampler which can sample elements with replacement.
@@ -47,6 +53,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 			this.poissonDistribution.reseedRandomGenerator(seed);
 		}
+		this.random = new Random(seed);
 	}
 	
 	/**
@@ -60,6 +67,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 		if (this.fraction > 0) {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 		}
+		this.random = new Random();
 	}
 	
 	/**
@@ -84,8 +92,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 				if (currentCount > 0) {
 					return true;
 				} else {
-					moveToNextElement();
-
+					samplingProcess();
 					if (currentCount > 0) {
 						return true;
 					} else {
@@ -93,28 +100,57 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 					}
 				}
 			}
-
-			private void moveToNextElement() {
-				while (input.hasNext()) {
-					currentElement = input.next();
-					currentCount = poissonDistribution.sample();
-					if (currentCount > 0) {
-						break;
-					}
-				}
-			}
 			
 			@Override
 			public T next() {
-				if (currentCount == 0) {
-					moveToNextElement();
+				if (currentCount <= 0) {
+					samplingProcess();
 				}
-
-				if (currentCount == 0) {
-					return null;
+				currentCount--;
+				return currentElement;
+			}
+			
+			public int poisson_ge1(double p){
+				// sample 'k' from Poisson(p), conditioned to k >= 1.
+				double q = Math.pow(Math.E, -p);
+				// simulate a poisson trial such that k >= 1.
+				double t = q + (1 - q) * random.nextDouble();
+				int k = 1;
+				// continue standard poisson generation trials.
+				t = t * random.nextDouble();
+				while (t > q) {
+					k++;
+					t = t * random.nextDouble();
+				}
+				return k;
+			}
+			
+			private void skipGapElements(int num) {
+				// skip the elements that occurrence number is zero.
+				int elementCount = 0;
+				while (input.hasNext() && elementCount < num){
+					currentElement = input.next();
+					elementCount++;
+				}
+			}
+			
+			private void samplingProcess(){
+				if (fraction <= THRESHOLD) {
+					double u = Math.max(random.nextDouble(), EPSILON);
+					int gap = (int) (Math.log(u) / -fraction);
+					skipGapElements(gap);
+					if (input.hasNext()) {
+						currentElement = input.next();
+						currentCount = poisson_ge1(fraction);
+					}
 				} else {
-					currentCount--;
-					return currentElement;
+					while (input.hasNext()){
+						currentElement = input.next();
+						currentCount = poissonDistribution.sample();
+						if (currentCount > 0) {
+							break;
+						}
+					}
 				}
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
index 5fe2920..7d74897 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
  * @param <T> The type of sampler data.
  */
 public abstract class RandomSampler<T> {
+
+	protected final static double EPSILON = 1e-5;
 	
 	protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() {
 		@Override


[4/4] flink git commit: [FLINK-2665] [api] [runtime] Makes ExecutionConfig serializable by forcing Kryo serializers to be Serializable

Posted by fh...@apache.org.
[FLINK-2665] [api] [runtime] Makes ExecutionConfig serializable by forcing Kryo serializers to be Serializable

This closes #1128

-----
Closing PR:
This closes #867


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc5d98c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc5d98c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc5d98c

Branch: refs/heads/master
Commit: 2cc5d98c690c1b1b1ff1f48628ca58b1b4f0c932
Parents: c923fb3
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 14 16:59:22 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:33:31 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 118 ++++-------------
 .../flink/api/java/ExecutionEnvironment.java    |  13 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |  26 ++--
 .../flink/api/scala/ExecutionEnvironment.scala  |  10 +-
 .../api/io/avro/AvroRecordInputFormatTest.java  |   4 +-
 .../environment/StreamExecutionEnvironment.java |   5 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  10 +-
 .../RegisterTypeWithKryoSerializerITCase.java   | 129 +++++++++++++++++++
 8 files changed, 197 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index d93eba6..4ae0b8d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,9 +21,8 @@ package org.apache.flink.api.common;
 import com.esotericsoftware.kryo.Serializer;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -94,19 +93,15 @@ public class ExecutionConfig implements Serializable {
 	private boolean timestampsEnabled = false;
 
 	// Serializers and types registered with Kryo and the PojoSerializer
-	// we store them in lists to ensure they are registered in order in all kryo instances.
+	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
 
-	private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers =
-			new ArrayList<Entry<Class<?>, Serializer<?>>>();
+	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
 
-	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses =
-			new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
 
-	private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers =
-			new ArrayList<Entry<Class<?>, Serializer<?>>>();
+	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
 
-	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses =
-			new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
 
 	private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<Class<?>>();
 
@@ -457,19 +452,12 @@ public class ExecutionConfig implements Serializable {
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializer The serializer to use.
 	 */
-	public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
 		if (type == null || serializer == null) {
 			throw new NullPointerException("Cannot register null class or serializer.");
 		}
-		if (!(serializer instanceof java.io.Serializable)) {
-			throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
-					+ "as defined by java.io.Serializable. For stateless serializers, you can use the "
-					+ "'registerSerializer(Class, Class)' method to register the serializer via its class.");
-		}
-		Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
-		if(!defaultKryoSerializers.contains(e)) {
-			defaultKryoSerializers.add(e);
-		}
+
+		defaultKryoSerializers.put(type, new SerializableSerializer<T>(serializer));
 	}
 
 	/**
@@ -482,10 +470,7 @@ public class ExecutionConfig implements Serializable {
 		if (type == null || serializerClass == null) {
 			throw new NullPointerException("Cannot register null class or serializer.");
 		}
-		Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
-		if(!defaultKryoSerializerClasses.contains(e)) {
-			defaultKryoSerializerClasses.add(e);
-		}
+		defaultKryoSerializerClasses.put(type, serializerClass);
 	}
 
 	/**
@@ -497,19 +482,12 @@ public class ExecutionConfig implements Serializable {
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializer The serializer to use.
 	 */
-	public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
 		if (type == null || serializer == null) {
 			throw new NullPointerException("Cannot register null class or serializer.");
 		}
-		if (!(serializer instanceof java.io.Serializable)) {
-			throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
-					+ "as defined by java.io.Serializable. For stateless serializers, you can use the "
-					+ "'registerSerializer(Class, Class)' method to register the serializer via its class.");
-		}
-		Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
-		if(!registeredTypesWithKryoSerializers.contains(e)) {
-			registeredTypesWithKryoSerializers.add(e);
-		}
+
+		registeredTypesWithKryoSerializers.put(type, new SerializableSerializer<T>(serializer));
 	}
 
 	/**
@@ -522,10 +500,7 @@ public class ExecutionConfig implements Serializable {
 		if (type == null || serializerClass == null) {
 			throw new NullPointerException("Cannot register null class or serializer.");
 		}
-		Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
-		if(!registeredTypesWithKryoSerializerClasses.contains(e)) {
-			registeredTypesWithKryoSerializerClasses.add(e);
-		}
+		registeredTypesWithKryoSerializerClasses.put(type, serializerClass);
 	}
 
 	/**
@@ -563,14 +538,14 @@ public class ExecutionConfig implements Serializable {
 	/**
 	 * Returns the registered types with Kryo Serializers.
 	 */
-	public List<Entry<Class<?>, Serializer<?>>> getRegisteredTypesWithKryoSerializers() {
+	public LinkedHashMap<Class<?>, SerializableSerializer<?>> getRegisteredTypesWithKryoSerializers() {
 		return registeredTypesWithKryoSerializers;
 	}
 
 	/**
 	 * Returns the registered types with their Kryo Serializer classes.
 	 */
-	public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getRegisteredTypesWithKryoSerializerClasses() {
+	public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses() {
 		return registeredTypesWithKryoSerializerClasses;
 	}
 
@@ -578,14 +553,14 @@ public class ExecutionConfig implements Serializable {
 	/**
 	 * Returns the registered default Kryo Serializers.
 	 */
-	public List<Entry<Class<?>, Serializer<?>>> getDefaultKryoSerializers() {
+	public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() {
 		return defaultKryoSerializers;
 	}
 
 	/**
 	 * Returns the registered default Kryo Serializer classes.
 	 */
-	public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getDefaultKryoSerializerClasses() {
+	public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses() {
 		return defaultKryoSerializerClasses;
 	}
 
@@ -633,60 +608,17 @@ public class ExecutionConfig implements Serializable {
 
 	// ------------------------------ Utilities  ----------------------------------
 
-	public static class Entry<K, V> implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		private final K k;
-		private final V v;
-
-		public Entry(K k, V v) {
-			this.k = k;
-			this.v = v;
-		}
-
-		public K getKey() {
-			return k;
-		}
-
-		public V getValue() {
-			return v;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
+	public static class SerializableSerializer<T extends Serializer<?> & Serializable> implements Serializable {
+		private static final long serialVersionUID = 4687893502781067189L;
 
-			Entry entry = (Entry) o;
+		private T serializer;
 
-			if (k != null ? !k.equals(entry.k) : entry.k != null) {
-				return false;
-			}
-			if (v != null ? !v.equals(entry.v) : entry.v != null) {
-				return false;
-			}
-
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			int result = k != null ? k.hashCode() : 0;
-			result = 31 * result + (v != null ? v.hashCode() : 0);
-			return result;
+		public SerializableSerializer(T serializer) {
+			this.serializer = serializer;
 		}
 
-		@Override
-		public String toString() {
-			return "Entry{" +
-					"k=" + k +
-					", v=" + v +
-					'}';
+		public T getSerializer() {
+			return serializer;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 297982c..64b3a1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -271,7 +272,7 @@ public abstract class ExecutionEnvironment {
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializer The serializer to use.
 	 */
-	public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
 		config.addDefaultKryoSerializer(type, serializer);
 	}
 
@@ -294,7 +295,7 @@ public abstract class ExecutionEnvironment {
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializer The serializer to use.
 	 */
-	public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
 		config.registerTypeWithKryoSerializer(type, serializer);
 	}
 
@@ -987,10 +988,10 @@ public abstract class ExecutionEnvironment {
 
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("Registered Kryo types: {}", Joiner.on(',').join(config.getRegisteredKryoTypes()));
-			LOG.debug("Registered Kryo with Serializers types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers()));
-			LOG.debug("Registered Kryo with Serializer Classes types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses()));
-			LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
-			LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
+			LOG.debug("Registered Kryo with Serializers types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers().entrySet()));
+			LOG.debug("Registered Kryo with Serializer Classes types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses().entrySet()));
+			LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers().entrySet()));
+			LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses().entrySet()));
 			LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
 
 			// print information about static code analysis

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 8ae3562..1bc6771 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -42,8 +42,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.lang.reflect.Modifier;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
+import java.util.Map;
 
 /**
  * A type serializer that serializes its type using the Kryo serialization
@@ -60,10 +61,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	// ------------------------------------------------------------------------
 
-	private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> registeredTypesWithSerializers;
-	private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithSerializerClasses;
-	private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> defaultSerializers;
-	private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> defaultSerializerClasses;
+	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
+	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
+	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
+	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
 	private final LinkedHashSet<Class<?>> registeredTypes;
 
 	private final Class<T> type;
@@ -264,11 +265,12 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 			// Add default serializers first, so that they type registrations without a serializer
 			// are registered with a default serializer
-			for(ExecutionConfig.Entry<Class<?>, Serializer<?>> serializer : defaultSerializers) {
-				kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue());
+			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
+				kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
 			}
-			for(ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> serializer : defaultSerializerClasses) {
-				kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue());
+
+			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
+				kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
 			}
 
 			// register the type of our class
@@ -281,7 +283,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			}
 
 			// register given serializer classes
-			for (ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses) {
+			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
 				Class<?> typeClass = e.getKey();
 				Class<? extends Serializer<?>> serializerClass = e.getValue();
 
@@ -291,8 +293,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			}
 
 			// register given serializers
-			for (ExecutionConfig.Entry<Class<?>, Serializer<?>> e : registeredTypesWithSerializers) {
-				kryo.register(e.getKey(), e.getValue());
+			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
+				kryo.register(e.getKey(), e.getValue().getSerializer());
 			}
 			// this is needed for Avro but can not be added on demand.
 			kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 39bb717..344b186 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -153,7 +153,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    * because it may be distributed to the worker nodes by java serialization.
    */
-  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
+      clazz: Class[_],
+      serializer: T)
+    : Unit = {
     javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
   }
 
@@ -178,7 +181,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    * because it may be distributed to the worker nodes by java serialization.
    */
-  def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+  def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
+      clazz: Class[_],
+      serializer: T)
+    : Unit = {
     javaEnv.addDefaultKryoSerializer(clazz, serializer)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index 29fa225..ab30ef3 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -242,7 +242,9 @@ public class AvroRecordInputFormatTest {
 
 		TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
 		Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
-		Assert.assertEquals(new ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>(Schema.class, Serializers.AvroSchemaSerializer.class), ec.getDefaultKryoSerializerClasses().get(0));
+		Assert.assertTrue(
+			ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+			ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
 		ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
 		tser.serialize(rec, target);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f052389..ff785c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -70,6 +70,7 @@ import org.apache.flink.util.SplittableIterator;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -443,7 +444,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param serializer
 	 * 		The serializer to use.
 	 */
-	public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
 		config.addDefaultKryoSerializer(type, serializer);
 	}
 
@@ -472,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param serializer
 	 * 		The serializer to use.
 	 */
-	public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+	public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
 		config.registerTypeWithKryoSerializer(type, serializer);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e983451..0eb3297 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -243,7 +243,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * @param serializer
    * The serializer to use.
    */
-  def addDefaultKryoSerializer(`type`: Class[_], serializer: Serializer[_]) {
+  def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
+      `type`: Class[_],
+      serializer: T)
+    : Unit = {
     javaEnv.addDefaultKryoSerializer(`type`, serializer)
   }
 
@@ -265,7 +268,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    * because it may be distributed to the worker nodes by java serialization.
    */
-  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
+      clazz: Class[_],
+      serializer: T)
+    : Unit = {
     javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
new file mode 100644
index 0000000..47f1fc0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class RegisterTypeWithKryoSerializerITCase extends MultipleProgramsTestBase {
+
+	public RegisterTypeWithKryoSerializerITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	/**
+	 * Tests whether the kryo serializer is forwarded via the ExecutionConfig.
+	 * @throws Exception
+	 */
+	@Test
+	public void testRegisterTypeWithKryoSerializer() throws Exception {
+		int numElements = 10;
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		env.registerTypeWithKryoSerializer(TestClass.class, new TestClassSerializer());
+
+		DataSet<Long> input = env.generateSequence(0, numElements - 1);
+
+		DataSet<TestClass> mapped = input.map(new MapFunction<Long, TestClass>() {
+			private static final long serialVersionUID = -529116076312998262L;
+
+			@Override
+			public TestClass map(Long value) throws Exception {
+				return new TestClass(value);
+			}
+		});
+
+		List<TestClass> expected = new ArrayList<>(numElements);
+
+		for (int i = 0; i < numElements; i++) {
+			expected.add(new TestClass(42));
+		}
+
+		compareResultCollections(expected, mapped.collect(), new Comparator<TestClass>() {
+			@Override
+			public int compare(TestClass o1, TestClass o2) {
+				return (int)(o1.getValue() - o2.getValue());
+			}
+		});
+	}
+
+	static class TestClass {
+		private final long value;
+		private Object obj = new Object();
+
+		public TestClass(long value) {
+			this.value = value;
+		}
+
+		public long getValue() {
+			return value;
+		}
+
+		@Override
+		public String toString() {
+			return "TestClass(" + value + ")";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof TestClass) {
+				TestClass other = (TestClass) obj;
+
+				return value == other.value;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return (int)value;
+		}
+	}
+
+	static class TestClassSerializer extends Serializer<TestClass> implements Serializable {
+
+		private static final long serialVersionUID = -3585880741695717533L;
+
+		@Override
+		public void write(Kryo kryo, Output output, TestClass testClass) {
+			output.writeLong(42);
+		}
+
+		@Override
+		public TestClass read(Kryo kryo, Input input, Class<TestClass> aClass) {
+			return new TestClass(input.readLong());
+		}
+	}
+}