You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/15 14:50:35 UTC
[1/4] flink git commit: [FLINK-2373] Add configuration parameter to
createRemoteEnvironment method
Repository: flink
Updated Branches:
refs/heads/master 8a75025ab -> 2cc5d98c6
[FLINK-2373] Add configuration parameter to createRemoteEnvironment method
This closes #1066
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e78b80c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e78b80c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e78b80c4
Branch: refs/heads/master
Commit: e78b80c4e4a99385ae5ef5a9afc9f8a64533a9e3
Parents: 8a75025
Author: Andreas Kunft <an...@tu-berlin.de>
Authored: Tue Jul 28 16:58:32 2015 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:19:04 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/RemoteExecutor.java | 41 +++--
.../RemoteExecutorHostnameResolutionTest.java | 3 +-
.../apache/flink/api/common/PlanExecutor.java | 8 +-
.../flink/api/java/ExecutionEnvironment.java | 22 +++
.../flink/api/java/RemoteEnvironment.java | 11 +-
.../flink/api/scala/ExecutionEnvironment.scala | 33 +++-
.../api/java/ScalaShellRemoteEnvironment.java | 3 +-
.../ExecutionEnvironmentITCase.java | 4 +-
.../RemoteEnvironmentITCase.java | 162 +++++++++++++++++++
9 files changed, 263 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index d1be6d2..20169f6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -54,30 +54,47 @@ public class RemoteExecutor extends PlanExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
private final List<String> jarFiles;
- private final Configuration configuration;
+
+ private final Configuration clientConfiguration;
public RemoteExecutor(String hostname, int port) {
- this(hostname, port, Collections.<String>emptyList());
+ this(hostname, port, Collections.<String>emptyList(), new Configuration());
}
public RemoteExecutor(String hostname, int port, String jarFile) {
- this(hostname, port, Collections.singletonList(jarFile));
+ this(hostname, port, Collections.singletonList(jarFile), new Configuration());
}
public RemoteExecutor(String hostport, String jarFile) {
- this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
+ this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
}
public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
- this(new InetSocketAddress(hostname, port), jarFiles);
+ this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
+ }
+
+ public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
+ this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
+ }
+
+ public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
+ this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
+ }
+
+ public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
+ this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
+ }
+
+ public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
+ this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
}
- public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
+ public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
this.jarFiles = jarFiles;
- configuration = new Configuration();
+ this.clientConfiguration = clientConfiguration;
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
+ clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
+ clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}
@Override
@@ -87,7 +104,7 @@ public class RemoteExecutor extends PlanExecutor {
}
public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
- Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
+ Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
JobSubmissionResult result = c.run(p, -1, true);
@@ -103,7 +120,7 @@ public class RemoteExecutor extends PlanExecutor {
File jarFile = new File(jarPath);
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
- Client c = new Client(configuration, program.getUserCodeClassLoader(), -1);
+ Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
@@ -118,7 +135,7 @@ public class RemoteExecutor extends PlanExecutor {
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
- Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
+ Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 47236af..7f67567 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.client.program.ProgramInvocationException;
import org.junit.Test;
@@ -67,7 +68,7 @@ public class RemoteExecutorHostnameResolutionTest {
try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
- RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
+ RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 74bdb09..1294011 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -105,11 +105,12 @@ public abstract class PlanExecutor {
*
* @param hostname The address of the JobManager to send the program to.
* @param port The port of the JobManager to send the program to.
+ * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
* @return A remote executor.
*/
- public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
+ public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
}
@@ -123,7 +124,10 @@ public abstract class PlanExecutor {
: Arrays.asList(jarFiles);
try {
- return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
+ PlanExecutor executor = (clientConfiguration == null) ?
+ reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
+ reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
+ return executor;
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor ("
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 32049c4..297982c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1106,6 +1106,28 @@ public abstract class ExecutionEnvironment {
}
/**
+ * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible from the
+ * cluster. The custom configuration file is used to configure Akka specific configuration parameters
+ * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
+ *
+ * Cluster configuration has to be done in the remotely running Flink instance.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param clientConfiguration Pass a custom configuration to the Client.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+ * user-defined functions, user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
+ RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
+ rec.setClientConfiguration(clientConfiguration);
+ return rec;
+ }
+
+ /**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
* cluster. The execution will use the specified parallelism.
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 6f84077..1f73e93 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
/**
* An {@link ExecutionEnvironment} that sends programs
@@ -35,6 +36,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
protected final int port;
private final String[] jarFiles;
+
+ private Configuration clientConfiguration;
/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
@@ -65,7 +68,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
this.lastJobExecutionResult = executor.executePlan(p);
@@ -78,7 +81,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
p.setDefaultParallelism(getParallelism());
registerCachedFilesWithPlan(p);
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
return executor.getOptimizerPlanAsJSON(p);
}
@@ -87,4 +90,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
}
+
+ public void setClientConfiguration(Configuration clientConfiguration) {
+ this.clientConfiguration = clientConfiguration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cdf7211..39bb717 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -134,7 +134,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getId: UUID = {
javaEnv.getId
}
-
+
/**
* Gets the JobExecutionResult of the last executed job.
*/
@@ -181,13 +181,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
javaEnv.addDefaultKryoSerializer(clazz, serializer)
}
-
+
/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
- *
+ *
*/
def registerType(typeClass: Class[_]) {
javaEnv.registerType(typeClass)
@@ -707,5 +707,32 @@ object ExecutionEnvironment {
javaEnv.setParallelism(parallelism)
new ExecutionEnvironment(javaEnv)
}
+
+ /**
+ * Creates a remote execution environment. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible
+ * from the cluster. The custom configuration file is used to configure Akka specific
+ * configuration parameters for the Client only; Program parallelism can be set via
+ * [[ExecutionEnvironment.setParallelism]].
+ *
+ * Cluster configuration has to be done in the remotely running Flink instance.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be
+ * executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param clientConfiguration Pass a custom configuration to the Client.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+ * program uses user-defined functions, user-defined input formats, or any
+ * libraries, those must be provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ def createRemoteEnvironment(
+ host: String,
+ port: Int,
+ clientConfiguration: Configuration,
+ jarFiles: String*): ExecutionEnvironment = {
+ val javaEnv = JavaEnv.createRemoteEnvironment(host, port, clientConfiguration, jarFiles: _*)
+ new ExecutionEnvironment(javaEnv)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 54af5bc..a7dc708 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -78,7 +79,7 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
alljars.add(jarFile);
String[] alljarsArr = new String[alljars.size()];
alljarsArr = alljars.toArray(alljarsArr);
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index 109af1e..f4b3875 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -80,9 +80,7 @@ public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
- List<Integer> resultCollection = new ArrayList<Integer>();
- result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
- env.execute();
+ List<Integer> resultCollection = result.collect();
assertEquals(PARALLELISM, resultCollection.size());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e78b80c4/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
new file mode 100644
index 0000000..68b099d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+ private static final int TM_SLOTS = 4;
+
+ private static final int NUM_TM = 1;
+
+ private static final int USER_DOP = 2;
+
+ private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+ private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void setupCluster() {
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+ cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster.start();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Error starting test cluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownCluster() {
+ try {
+ cluster.stop();
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Cluster shutdown caused an exception: " + t.getMessage());
+ }
+ }
+
+ /**
+ * Ensure that that Akka configuration parameters can be set.
+ */
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidAkkaConfiguration() throws Throwable {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
+
+ final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ cluster.hostname(),
+ cluster.getLeaderRPCPort(),
+ config
+ );
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<String> result = env.createInput(new TestNonRichInputFormat());
+ result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
+ try {
+ env.execute();
+ Assert.fail("Program should not run successfully, cause of invalid akka settings.");
+ } catch (ProgramInvocationException ex) {
+ throw ex.getCause();
+ }
+ }
+
+ /**
+ * Ensure that the program parallelism can be set even if the configuration is supplied.
+ */
+ @Test
+ public void testUserSpecificParallelism() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
+
+ final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ cluster.hostname(),
+ cluster.getLeaderRPCPort(),
+ config
+ );
+ env.setParallelism(USER_DOP);
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
+ .rebalance()
+ .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
+ @Override
+ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ out.collect(getRuntimeContext().getIndexOfThisSubtask());
+ }
+ });
+ List<Integer> resultCollection = result.collect();
+ assertEquals(USER_DOP, resultCollection.size());
+ }
+
+ private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+ private transient boolean emitted;
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ assertEquals(USER_DOP, numSplits);
+ return super.createInputSplits(numSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return emitted;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) {
+ if (emitted) {
+ return null;
+ }
+ emitted = true;
+ return 1;
+ }
+ }
+}
[2/4] flink git commit: [FLINK-2655] Minimize intermediate merging of
external merge sort.
Posted by fh...@apache.org.
[FLINK-2655] Minimize intermediate merging of external merge sort.
This closes #1118
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a75dd627
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a75dd627
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a75dd627
Branch: refs/heads/master
Commit: a75dd6270fd47720ae1c2ce9464ff6b8f7b43d39
Parents: e78b80c
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 10:16:35 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:01 2015 +0200
----------------------------------------------------------------------
.../operators/sort/UnilateralSortMerger.java | 47 +++++++++++---------
1 file changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a75dd627/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index fd1062d..32fbb52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1513,11 +1513,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
}
/**
- * Merges the given sorted runs to a smaller number of sorted runs.
- *
+ * Merges the given sorted runs to a smaller number of sorted runs.
+ *
* @param channelIDs The IDs of the sorted runs that need to be merged.
+ * @param allReadBuffers
* @param writeBuffers The buffers to be used by the writers.
-
* @return A list of the IDs of the merged channels.
* @throws IOException Thrown, if the readers or writers encountered an I/O problem.
*/
@@ -1525,34 +1525,41 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers)
throws IOException
{
- final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn));
- final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges);
-
+ // A channel list with length maxFanIn<sup>i</sup> can be merged to maxFanIn files in i-1 rounds where every merge
+ // is a full merge with maxFanIn input channels. A partial round includes merges with fewer than maxFanIn
+ // inputs. It is most efficient to perform the partial round first.
+ final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
+
+ final int numStart = channelIDs.size();
+ final int numEnd = (int) Math.pow(this.maxFanIn, scale);
+
+ final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (this.maxFanIn - 1));
+
+ final int numNotMerged = numEnd - numMerges;
+ final int numToMerge = numStart - numNotMerged;
+
+ // unmerged channel IDs are copied directly to the result list
+ final List<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>(numEnd);
+ mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
+
+ final int channelsToMergePerStep = (int) Math.ceil(numToMerge / (double) numMerges);
+
// allocate the memory for the merging step
final List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
-
- // the list containing the IDs of the merged channels
- final ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int) (numMerges + 1));
- final ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
- int channelNum = 0;
+ final List<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
+ int channelNum = numNotMerged;
while (isRunning() && channelNum < channelIDs.size()) {
channelsToMergeThisStep.clear();
for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); i++, channelNum++) {
channelsToMergeThisStep.add(channelIDs.get(channelNum));
}
-
- // merge only, if there is more than one channel
- if (channelsToMergeThisStep.size() < 2) {
- mergedChannelIDs.addAll(channelsToMergeThisStep);
- }
- else {
- mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
- }
+
+ mergedChannelIDs.add(mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
}
-
+
return mergedChannelIDs;
}
[3/4] flink git commit: [FLINK-2533] [java-api] Gap based random
sample optimization.
Posted by fh...@apache.org.
[FLINK-2533] [java-api] Gap based random sample optimization.
This closes #1110
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c923fb3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c923fb3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c923fb3c
Branch: refs/heads/master
Commit: c923fb3c1c1d61462e1079198ae9fb735bb0acf2
Parents: a75dd62
Author: gallenvara <ga...@126.com>
Authored: Mon Sep 7 14:55:11 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:20:41 2015 +0200
----------------------------------------------------------------------
.../api/java/sampling/BernoulliSampler.java | 36 ++++++++--
.../flink/api/java/sampling/PoissonSampler.java | 74 +++++++++++++++-----
.../flink/api/java/sampling/RandomSampler.java | 2 +
3 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index 0f5ecc6..99ea5de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -28,12 +28,16 @@ import java.util.Random;
* Bernoulli experiment.
*
* @param <T> The type of sample.
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
*/
public class BernoulliSampler<T> extends RandomSampler<T> {
private final double fraction;
private final Random random;
+ // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+ private final static double THRESHOLD = 0.33;
+
/**
* Create a Bernoulli sampler with sample fraction and default random number generator.
*
@@ -102,15 +106,35 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
}
private T getNextSampledElement() {
- while (input.hasNext()) {
- T element = input.next();
+ if (fraction <= THRESHOLD) {
+ double rand = random.nextDouble();
+ double u = Math.max(rand, EPSILON);
+ int gap = (int) (Math.log(u) / Math.log(1 - fraction));
+ int elementCount = 0;
+ if (input.hasNext()) {
+ T element = input.next();
+ while (input.hasNext() && elementCount < gap) {
+ element = input.next();
+ elementCount++;
+ }
+ if (elementCount < gap) {
+ return null;
+ } else {
+ return element;
+ }
+ } else {
+ return null;
+ }
+ } else {
+ while (input.hasNext()) {
+ T element = input.next();
- if (random.nextDouble() <= fraction) {
- return element;
+ if (random.nextDouble() <= fraction) {
+ return element;
+ }
}
+ return null;
}
-
- return null;
}
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 3834d24..8701167 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.math3.distribution.PoissonDistribution;
import java.util.Iterator;
+import java.util.Random;
/**
* A sampler implementation based on the Poisson Distribution. While sampling elements with fraction
@@ -28,11 +29,16 @@ import java.util.Iterator;
*
* @param <T> The type of sample.
* @see <a href="https://en.wikipedia.org/wiki/Poisson_distribution">https://en.wikipedia.org/wiki/Poisson_distribution</a>
+ * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
*/
public class PoissonSampler<T> extends RandomSampler<T> {
private PoissonDistribution poissonDistribution;
private final double fraction;
+ private final Random random;
+
+ // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction.
+ private final static double THRESHOLD = 0.4;
/**
* Create a poisson sampler which can sample elements with replacement.
@@ -47,6 +53,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
this.poissonDistribution = new PoissonDistribution(fraction);
this.poissonDistribution.reseedRandomGenerator(seed);
}
+ this.random = new Random(seed);
}
/**
@@ -60,6 +67,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
if (this.fraction > 0) {
this.poissonDistribution = new PoissonDistribution(fraction);
}
+ this.random = new Random();
}
/**
@@ -84,8 +92,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
if (currentCount > 0) {
return true;
} else {
- moveToNextElement();
-
+ samplingProcess();
if (currentCount > 0) {
return true;
} else {
@@ -93,28 +100,57 @@ public class PoissonSampler<T> extends RandomSampler<T> {
}
}
}
-
- private void moveToNextElement() {
- while (input.hasNext()) {
- currentElement = input.next();
- currentCount = poissonDistribution.sample();
- if (currentCount > 0) {
- break;
- }
- }
- }
@Override
public T next() {
- if (currentCount == 0) {
- moveToNextElement();
+ if (currentCount <= 0) {
+ samplingProcess();
}
-
- if (currentCount == 0) {
- return null;
+ currentCount--;
+ return currentElement;
+ }
+
+ public int poisson_ge1(double p){
+ // sample 'k' from Poisson(p), conditioned to k >= 1.
+ double q = Math.pow(Math.E, -p);
+ // simulate a poisson trial such that k >= 1.
+ double t = q + (1 - q) * random.nextDouble();
+ int k = 1;
+ // continue standard poisson generation trials.
+ t = t * random.nextDouble();
+ while (t > q) {
+ k++;
+ t = t * random.nextDouble();
+ }
+ return k;
+ }
+
+ private void skipGapElements(int num) {
+ // skip the elements that occurrence number is zero.
+ int elementCount = 0;
+ while (input.hasNext() && elementCount < num){
+ currentElement = input.next();
+ elementCount++;
+ }
+ }
+
+ private void samplingProcess(){
+ if (fraction <= THRESHOLD) {
+ double u = Math.max(random.nextDouble(), EPSILON);
+ int gap = (int) (Math.log(u) / -fraction);
+ skipGapElements(gap);
+ if (input.hasNext()) {
+ currentElement = input.next();
+ currentCount = poisson_ge1(fraction);
+ }
} else {
- currentCount--;
- return currentElement;
+ while (input.hasNext()){
+ currentElement = input.next();
+ currentCount = poissonDistribution.sample();
+ if (currentCount > 0) {
+ break;
+ }
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/c923fb3c/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
index 5fe2920..7d74897 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
* @param <T> The type of sampler data.
*/
public abstract class RandomSampler<T> {
+
+ protected final static double EPSILON = 1e-5;
protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() {
@Override
[4/4] flink git commit: [FLINK-2665] [api] [runtime] Makes
ExecutionConfig serializable by forcing Kryo serializers to be Serializable
Posted by fh...@apache.org.
[FLINK-2665] [api] [runtime] Makes ExecutionConfig serializable by forcing Kryo serializers to be Serializable
This closes #1128
-----
Closing PR:
This closes #867
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc5d98c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc5d98c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc5d98c
Branch: refs/heads/master
Commit: 2cc5d98c690c1b1b1ff1f48628ca58b1b4f0c932
Parents: c923fb3
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 14 16:59:22 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 15 12:33:31 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 118 ++++-------------
.../flink/api/java/ExecutionEnvironment.java | 13 +-
.../typeutils/runtime/kryo/KryoSerializer.java | 26 ++--
.../flink/api/scala/ExecutionEnvironment.scala | 10 +-
.../api/io/avro/AvroRecordInputFormatTest.java | 4 +-
.../environment/StreamExecutionEnvironment.java | 5 +-
.../api/scala/StreamExecutionEnvironment.scala | 10 +-
.../RegisterTypeWithKryoSerializerITCase.java | 129 +++++++++++++++++++
8 files changed, 197 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index d93eba6..4ae0b8d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,9 +21,8 @@ package org.apache.flink.api.common;
import com.esotericsoftware.kryo.Serializer;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Map;
/**
@@ -94,19 +93,15 @@ public class ExecutionConfig implements Serializable {
private boolean timestampsEnabled = false;
// Serializers and types registered with Kryo and the PojoSerializer
- // we store them in lists to ensure they are registered in order in all kryo instances.
+ // we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
- private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers =
- new ArrayList<Entry<Class<?>, Serializer<?>>>();
+ private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
- private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses =
- new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
- private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers =
- new ArrayList<Entry<Class<?>, Serializer<?>>>();
+ private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
- private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses =
- new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<Class<?>>();
@@ -457,19 +452,12 @@ public class ExecutionConfig implements Serializable {
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
- public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
- if (!(serializer instanceof java.io.Serializable)) {
- throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
- + "as defined by java.io.Serializable. For stateless serializers, you can use the "
- + "'registerSerializer(Class, Class)' method to register the serializer via its class.");
- }
- Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
- if(!defaultKryoSerializers.contains(e)) {
- defaultKryoSerializers.add(e);
- }
+
+ defaultKryoSerializers.put(type, new SerializableSerializer<T>(serializer));
}
/**
@@ -482,10 +470,7 @@ public class ExecutionConfig implements Serializable {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
- Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
- if(!defaultKryoSerializerClasses.contains(e)) {
- defaultKryoSerializerClasses.add(e);
- }
+ defaultKryoSerializerClasses.put(type, serializerClass);
}
/**
@@ -497,19 +482,12 @@ public class ExecutionConfig implements Serializable {
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
- public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
- if (!(serializer instanceof java.io.Serializable)) {
- throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
- + "as defined by java.io.Serializable. For stateless serializers, you can use the "
- + "'registerSerializer(Class, Class)' method to register the serializer via its class.");
- }
- Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
- if(!registeredTypesWithKryoSerializers.contains(e)) {
- registeredTypesWithKryoSerializers.add(e);
- }
+
+ registeredTypesWithKryoSerializers.put(type, new SerializableSerializer<T>(serializer));
}
/**
@@ -522,10 +500,7 @@ public class ExecutionConfig implements Serializable {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
- Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
- if(!registeredTypesWithKryoSerializerClasses.contains(e)) {
- registeredTypesWithKryoSerializerClasses.add(e);
- }
+ registeredTypesWithKryoSerializerClasses.put(type, serializerClass);
}
/**
@@ -563,14 +538,14 @@ public class ExecutionConfig implements Serializable {
/**
* Returns the registered types with Kryo Serializers.
*/
- public List<Entry<Class<?>, Serializer<?>>> getRegisteredTypesWithKryoSerializers() {
+ public LinkedHashMap<Class<?>, SerializableSerializer<?>> getRegisteredTypesWithKryoSerializers() {
return registeredTypesWithKryoSerializers;
}
/**
* Returns the registered types with their Kryo Serializer classes.
*/
- public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getRegisteredTypesWithKryoSerializerClasses() {
+ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses() {
return registeredTypesWithKryoSerializerClasses;
}
@@ -578,14 +553,14 @@ public class ExecutionConfig implements Serializable {
/**
* Returns the registered default Kryo Serializers.
*/
- public List<Entry<Class<?>, Serializer<?>>> getDefaultKryoSerializers() {
+ public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() {
return defaultKryoSerializers;
}
/**
* Returns the registered default Kryo Serializer classes.
*/
- public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getDefaultKryoSerializerClasses() {
+ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses() {
return defaultKryoSerializerClasses;
}
@@ -633,60 +608,17 @@ public class ExecutionConfig implements Serializable {
// ------------------------------ Utilities ----------------------------------
- public static class Entry<K, V> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final K k;
- private final V v;
-
- public Entry(K k, V v) {
- this.k = k;
- this.v = v;
- }
-
- public K getKey() {
- return k;
- }
-
- public V getValue() {
- return v;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
+ public static class SerializableSerializer<T extends Serializer<?> & Serializable> implements Serializable {
+ private static final long serialVersionUID = 4687893502781067189L;
- Entry entry = (Entry) o;
+ private T serializer;
- if (k != null ? !k.equals(entry.k) : entry.k != null) {
- return false;
- }
- if (v != null ? !v.equals(entry.v) : entry.v != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = k != null ? k.hashCode() : 0;
- result = 31 * result + (v != null ? v.hashCode() : 0);
- return result;
+ public SerializableSerializer(T serializer) {
+ this.serializer = serializer;
}
- @Override
- public String toString() {
- return "Entry{" +
- "k=" + k +
- ", v=" + v +
- '}';
+ public T getSerializer() {
+ return serializer;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 297982c..64b3a1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -271,7 +272,7 @@ public abstract class ExecutionEnvironment {
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
- public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
config.addDefaultKryoSerializer(type, serializer);
}
@@ -294,7 +295,7 @@ public abstract class ExecutionEnvironment {
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
- public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
config.registerTypeWithKryoSerializer(type, serializer);
}
@@ -987,10 +988,10 @@ public abstract class ExecutionEnvironment {
if(LOG.isDebugEnabled()) {
LOG.debug("Registered Kryo types: {}", Joiner.on(',').join(config.getRegisteredKryoTypes()));
- LOG.debug("Registered Kryo with Serializers types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers()));
- LOG.debug("Registered Kryo with Serializer Classes types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses()));
- LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
- LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
+ LOG.debug("Registered Kryo with Serializers types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializers().entrySet()));
+ LOG.debug("Registered Kryo with Serializer Classes types: {}", Joiner.on(',').join(config.getRegisteredTypesWithKryoSerializerClasses().entrySet()));
+ LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers().entrySet()));
+ LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses().entrySet()));
LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));
// print information about static code analysis
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 8ae3562..1bc6771 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -42,8 +42,9 @@ import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Modifier;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
+import java.util.Map;
/**
* A type serializer that serializes its type using the Kryo serialization
@@ -60,10 +61,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// ------------------------------------------------------------------------
- private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> registeredTypesWithSerializers;
- private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithSerializerClasses;
- private final List<ExecutionConfig.Entry<Class<?>, Serializer<?>>> defaultSerializers;
- private final List<ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>> defaultSerializerClasses;
+ private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
+ private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;
private final Class<T> type;
@@ -264,11 +265,12 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// Add default serializers first, so that they type registrations without a serializer
// are registered with a default serializer
- for(ExecutionConfig.Entry<Class<?>, Serializer<?>> serializer : defaultSerializers) {
- kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue());
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}
- for(ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> serializer : defaultSerializerClasses) {
- kryo.addDefaultSerializer(serializer.getKey(), serializer.getValue());
+
+ for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}
// register the type of our class
@@ -281,7 +283,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
// register given serializer classes
- for (ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses) {
+ for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();
@@ -291,8 +293,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
// register given serializers
- for (ExecutionConfig.Entry<Class<?>, Serializer<?>> e : registeredTypesWithSerializers) {
- kryo.register(e.getKey(), e.getValue());
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
+ kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 39bb717..344b186 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -153,7 +153,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*/
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+ def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
+ clazz: Class[_],
+ serializer: T)
+ : Unit = {
javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
}
@@ -178,7 +181,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*/
- def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+ def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
+ clazz: Class[_],
+ serializer: T)
+ : Unit = {
javaEnv.addDefaultKryoSerializer(clazz, serializer)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index 29fa225..ab30ef3 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -242,7 +242,9 @@ public class AvroRecordInputFormatTest {
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
- Assert.assertEquals(new ExecutionConfig.Entry<Class<?>, Class<? extends Serializer<?>>>(Schema.class, Serializers.AvroSchemaSerializer.class), ec.getDefaultKryoSerializerClasses().get(0));
+ Assert.assertTrue(
+ ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+ ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
tser.serialize(rec, target);
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f052389..ff785c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -70,6 +70,7 @@ import org.apache.flink.util.SplittableIterator;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@@ -443,7 +444,7 @@ public abstract class StreamExecutionEnvironment {
* @param serializer
* The serializer to use.
*/
- public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
config.addDefaultKryoSerializer(type, serializer);
}
@@ -472,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
* @param serializer
* The serializer to use.
*/
- public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
config.registerTypeWithKryoSerializer(type, serializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e983451..0eb3297 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -243,7 +243,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* @param serializer
* The serializer to use.
*/
- def addDefaultKryoSerializer(`type`: Class[_], serializer: Serializer[_]) {
+ def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
+ `type`: Class[_],
+ serializer: T)
+ : Unit = {
javaEnv.addDefaultKryoSerializer(`type`, serializer)
}
@@ -265,7 +268,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*/
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+ def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
+ clazz: Class[_],
+ serializer: T)
+ : Unit = {
javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cc5d98c/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
new file mode 100644
index 0000000..47f1fc0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class RegisterTypeWithKryoSerializerITCase extends MultipleProgramsTestBase {
+
+ public RegisterTypeWithKryoSerializerITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ /**
+ * Tests whether the kryo serializer is forwarded via the ExecutionConfig.
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterTypeWithKryoSerializer() throws Exception {
+ int numElements = 10;
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ env.registerTypeWithKryoSerializer(TestClass.class, new TestClassSerializer());
+
+ DataSet<Long> input = env.generateSequence(0, numElements - 1);
+
+ DataSet<TestClass> mapped = input.map(new MapFunction<Long, TestClass>() {
+ private static final long serialVersionUID = -529116076312998262L;
+
+ @Override
+ public TestClass map(Long value) throws Exception {
+ return new TestClass(value);
+ }
+ });
+
+ List<TestClass> expected = new ArrayList<>(numElements);
+
+ for (int i = 0; i < numElements; i++) {
+ expected.add(new TestClass(42));
+ }
+
+ compareResultCollections(expected, mapped.collect(), new Comparator<TestClass>() {
+ @Override
+ public int compare(TestClass o1, TestClass o2) {
+ return (int)(o1.getValue() - o2.getValue());
+ }
+ });
+ }
+
+ static class TestClass {
+ private final long value;
+ private Object obj = new Object();
+
+ public TestClass(long value) {
+ this.value = value;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "TestClass(" + value + ")";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TestClass) {
+ TestClass other = (TestClass) obj;
+
+ return value == other.value;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)value;
+ }
+ }
+
+ static class TestClassSerializer extends Serializer<TestClass> implements Serializable {
+
+ private static final long serialVersionUID = -3585880741695717533L;
+
+ @Override
+ public void write(Kryo kryo, Output output, TestClass testClass) {
+ output.writeLong(42);
+ }
+
+ @Override
+ public TestClass read(Kryo kryo, Input input, Class<TestClass> aClass) {
+ return new TestClass(input.readLong());
+ }
+ }
+}