You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/12 06:42:11 UTC

[flink] branch release-1.11 updated (2f4956f -> a6557f6)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2f4956f  [FLINK-18256][orc] Exclude ORC's Hadoop dependency and pull in provided vanilla hadoop in flink-orc
     new b1472a8  [hotfix] Improve exception message for parsing kryo serializer classes from config
     new a6557f6  [FLINK-18241] Use correct user class loader in OptimizerPlanEnvironment & StreamPlanEnvironment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-clients/pom.xml                              |   8 +
 .../client/program/OptimizerPlanEnvironment.java   |   4 +-
 .../flink/client/program/PackagedProgramUtils.java |  10 +-
 .../client/program/StreamPlanEnvironment.java      |   4 +-
 .../program/PackagedProgramUtilsPipelineTest.java  | 194 +++++++++++++++++++++
 .../client/program/PackagedProgramUtilsTest.java   |  97 +----------
 .../apache/flink/api/common/ExecutionConfig.java   |   6 +-
 .../flink/api/java/ExecutionEnvironment.java       |  13 +-
 .../environment/StreamExecutionEnvironment.java    |  15 +-
 9 files changed, 250 insertions(+), 101 deletions(-)
 create mode 100644 flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java


[flink] 02/02: [FLINK-18241] Use correct user class loader in OptimizerPlanEnvironment & StreamPlanEnvironment

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6557f66435973b65967af7dd1d893f748f7feae
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 11 12:08:15 2020 +0200

    [FLINK-18241] Use correct user class loader in OptimizerPlanEnvironment & StreamPlanEnvironment
    
    This closes #12607
---
 flink-clients/pom.xml                              |   8 +
 .../client/program/OptimizerPlanEnvironment.java   |   4 +-
 .../flink/client/program/PackagedProgramUtils.java |  10 +-
 .../client/program/StreamPlanEnvironment.java      |   4 +-
 .../program/PackagedProgramUtilsPipelineTest.java  | 194 +++++++++++++++++++++
 .../client/program/PackagedProgramUtilsTest.java   |  97 +----------
 .../flink/api/java/ExecutionEnvironment.java       |  13 +-
 .../environment/StreamExecutionEnvironment.java    |  15 +-
 8 files changed, 245 insertions(+), 100 deletions(-)

diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 6505844..65a55fa 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -88,6 +88,14 @@ under the License.
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 
 	<!-- More information on this:
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index 4f9b40e..20d2274 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -35,8 +35,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		return pipeline;
 	}
 
-	public OptimizerPlanEnvironment(Configuration configuration, int parallelism) {
-		super(configuration);
+	public OptimizerPlanEnvironment(Configuration configuration, ClassLoader userClassloader, int parallelism) {
+		super(configuration, userClassloader);
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 0db1686..2c4c7b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -135,9 +135,15 @@ public enum PackagedProgramUtils {
 		}
 
 		// temporary hack to support the optimizer plan preview
-		OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism);
+		OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
+			configuration,
+			program.getUserCodeClassLoader(),
+			parallelism);
 		benv.setAsContext();
-		StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism);
+		StreamPlanEnvironment senv = new StreamPlanEnvironment(
+			configuration,
+			program.getUserCodeClassLoader(),
+			parallelism);
 		senv.setAsContext();
 
 		try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
index b2ce783..7f086f9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
@@ -38,8 +38,8 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 		return pipeline;
 	}
 
-	public StreamPlanEnvironment(Configuration configuration, int parallelism) {
-		super(configuration);
+	public StreamPlanEnvironment(Configuration configuration, ClassLoader userClassLoader, int parallelism) {
+		super(configuration, userClassLoader);
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
new file mode 100644
index 0000000..4c87db5
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.testutils.ClassLoaderUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PackagedProgramUtils} methods that should be executed for
+ * {@link StreamExecutionEnvironment} and {@link Environment}.
+ */
+@RunWith(Parameterized.class)
+public class PackagedProgramUtilsPipelineTest {
+
+	@Parameterized.Parameter
+	public TestParameter testParameter;
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Parameterized.Parameters
+	public static Collection<TestParameter> parameters() {
+		return Arrays.asList(
+			TestParameter.of(DataSetTestProgram.class, pipeline -> ((Plan) pipeline).getExecutionConfig()),
+			TestParameter.of(DataStreamTestProgram.class, pipeline -> ((StreamGraph) pipeline).getExecutionConfig())
+		);
+	}
+
+	/**
+	 * This tests whether configuration forwarding from a {@link Configuration} to the environment
+	 * works.
+	 */
+	@Test
+	public void testConfigurationForwarding() throws Exception {
+		// we want to test forwarding with this config, ensure that the default is what we expect.
+		assertThat(
+			ExecutionEnvironment.getExecutionEnvironment().getConfig().isAutoTypeRegistrationDisabled(),
+			is(false));
+
+		PackagedProgram packagedProgram = PackagedProgram.newBuilder()
+			.setEntryPointClassName(testParameter.entryClass().getName())
+			.build();
+
+		Configuration config = new Configuration();
+		config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
+
+		Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
+			packagedProgram,
+			config,
+			1 /* parallelism */,
+			false /* suppress output */);
+
+		ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline);
+
+		// we want to test forwarding with this config, ensure that the default is what we expect.
+		assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
+	}
+
+	@Test
+	public void testUserClassloaderForConfiguration() throws Exception {
+		String userSerializerClassName = "UserSerializer";
+		List<URL> userUrls = getClassUrls(userSerializerClassName);
+
+		PackagedProgram packagedProgram = PackagedProgram.newBuilder()
+			.setUserClassPaths(userUrls)
+			.setEntryPointClassName(testParameter.entryClass().getName())
+			.build();
+
+		Configuration config = new Configuration();
+		config.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, Collections.singletonList(
+			String.format(
+				"class:%s,serializer:%s",
+				PackagedProgramUtilsPipelineTest.class.getName(),
+				userSerializerClassName)
+		));
+
+		Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
+			packagedProgram,
+			config,
+			1 /* parallelism */,
+			false /* suppress output */);
+
+		ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline);
+
+		assertThat(
+			executionConfig.getDefaultKryoSerializerClasses().get(PackagedProgramUtilsPipelineTest.class).getName(),
+			is(userSerializerClassName));
+	}
+
+	private List<URL> getClassUrls(String className) throws IOException {
+		URLClassLoader urlClassLoader = ClassLoaderUtils.compileAndLoadJava(
+			temporaryFolder.newFolder(),
+			className + ".java",
+			"import com.esotericsoftware.kryo.Kryo;\n" +
+				"import com.esotericsoftware.kryo.Serializer;\n" +
+				"import com.esotericsoftware.kryo.io.Input;\n" +
+				"import com.esotericsoftware.kryo.io.Output;\n"
+				+ "public class " + className + " extends Serializer {\n" +
+				"\t@Override\n" +
+				"\tpublic void write(\n" +
+				"\t\tKryo kryo,\n" +
+				"\t\tOutput output,\n" +
+				"\t\tObject object) {\n" +
+				"\t}\n" +
+				"\n" +
+				"\t@Override\n" +
+				"\tpublic Object read(Kryo kryo, Input input, Class type) {\n" +
+				"\t\treturn null;\n" +
+				"\t}\n" +
+				"}");
+		return Arrays.asList(urlClassLoader.getURLs());
+	}
+
+	private interface TestParameter {
+		Class<?> entryClass();
+
+		ExecutionConfig extractExecutionConfig(Pipeline pipeline);
+
+		static TestParameter of(Class<?> entryClass, Function<Pipeline, ExecutionConfig> executionConfigExtractor) {
+			return new TestParameter() {
+				@Override
+				public Class<?> entryClass() {
+					return entryClass;
+				}
+
+				@Override
+				public ExecutionConfig extractExecutionConfig(Pipeline pipeline) {
+					return executionConfigExtractor.apply(pipeline);
+				}
+			};
+		}
+	}
+
+	/** Test Program for the DataSet API. */
+	public static class DataSetTestProgram {
+		public static void main(String[] args) throws Exception {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements("hello").print();
+			env.execute();
+		}
+	}
+
+	/** Test Program for the DataStream API. */
+	public static class DataStreamTestProgram {
+		public static void main(String[] args) throws Exception {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements("hello").print();
+			env.execute();
+		}
+	}
+
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
index aa271c5..8c110a7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
@@ -18,14 +18,8 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 import org.junit.Test;
 
@@ -38,59 +32,12 @@ import static org.junit.Assert.assertThat;
 
 /**
  * Tests {@link PackagedProgramUtils}.
+ *
+ * <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to test behaviour of
+ * {@link DataStream} and {@link DataSet} programs.
  */
 public class PackagedProgramUtilsTest {
 
-	/**
-	 * This tests whether configuration forwarding from a {@link Configuration} to the environment
-	 * works.
-	 */
-	@Test
-	public void testDataSetConfigurationForwarding() throws Exception {
-		assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());
-
-		PackagedProgram packagedProgram = PackagedProgram.newBuilder()
-				.setEntryPointClassName(DataSetTestProgram.class.getName())
-				.build();
-
-		Configuration config = createConfigurationWithOption();
-
-		Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
-				packagedProgram,
-				config,
-				1 /* parallelism */,
-				false /* suppress output */);
-
-		ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig();
-
-		assertExpectedOption(executionConfig);
-	}
-
-	/**
-	 * This tests whether configuration forwarding from a {@link Configuration} to the environment
-	 * works.
-	 */
-	@Test
-	public void testDataStreamConfigurationForwarding() throws Exception {
-		assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());
-
-		PackagedProgram packagedProgram = PackagedProgram.newBuilder()
-				.setEntryPointClassName(DataStreamTestProgram.class.getName())
-				.build();
-
-		Configuration config = createConfigurationWithOption();
-
-		Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
-				packagedProgram,
-				config,
-				1 /* parallelism */,
-				false /* suppress output */);
-
-		ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig();
-
-		assertExpectedOption(executionConfig);
-	}
-
 	@Test
 	public void testResolveURI() throws URISyntaxException {
 		final String relativeFile = "path/of/user.jar";
@@ -111,38 +58,4 @@ public class PackagedProgramUtilsTest {
 		assertThat(resolveURI(localSchemaFile).getScheme(), is("local"));
 		assertThat(resolveURI(localSchemaFile).toString(), is(localSchemaFile));
 	}
-
-	private static void assertPrecondition(ExecutionConfig executionConfig) {
-		// we want to test forwarding with this config, ensure that the default is what we expect.
-		assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false));
-	}
-
-	private static void assertExpectedOption(ExecutionConfig executionConfig) {
-		// we want to test forwarding with this config, ensure that the default is what we expect.
-		assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
-	}
-
-	private static Configuration createConfigurationWithOption() {
-		Configuration config = new Configuration();
-		config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
-		return config;
-	}
-
-	/** Test Program for the DataSet API. */
-	public static class DataSetTestProgram {
-		public static void main(String[] args) throws Exception {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.fromElements("hello").print();
-			env.execute();
-		}
-	}
-
-	/** Test Program for the DataStream API. */
-	public static class DataStreamTestProgram {
-		public static void main(String[] args) throws Exception {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.fromElements("hello").print();
-			env.execute();
-		}
-	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 75c8dd1..9f0089d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -146,7 +146,18 @@ public class ExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public ExecutionEnvironment(final Configuration configuration) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, null);
+		this(configuration, null);
+	}
+
+	/**
+	 * Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
+	 * configure the {@link PipelineExecutor}.
+	 *
+	 * <p>In addition, this constructor allows specifying the user code {@link ClassLoader}.
+	 */
+	@PublicEvolving
+	public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {
+		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7ee04e7..c688506 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -190,7 +190,20 @@ public class StreamExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public StreamExecutionEnvironment(final Configuration configuration) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, null);
+		this(configuration, null);
+	}
+
+	/**
+	 * Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
+	 * Configuration} to configure the {@link PipelineExecutor}.
+	 *
+	 * <p>In addition, this constructor allows specifying the user code {@link ClassLoader}.
+	 */
+	@PublicEvolving
+	public StreamExecutionEnvironment(
+			final Configuration configuration,
+			final ClassLoader userClassloader) {
+		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
 	}
 
 	/**


[flink] 01/02: [hotfix] Improve exception message for parsing kryo serializer classes from config

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1472a8b504a02baf226e97469161ccc3fc0d98b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 11 12:07:58 2020 +0200

    [hotfix] Improve exception message for parsing kryo serializer classes from config
---
 .../src/main/java/org/apache/flink/api/common/ExecutionConfig.java  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 291bced..73ff232 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -1222,7 +1222,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		try {
 			return parseKryoSerializers(classLoader, kryoSerializers);
 		} catch (Exception e) {
-			throw new IllegalArgumentException("Could not configure kryo serializers from " + kryoSerializers);
+			throw new IllegalArgumentException(
+				String.format(
+					"Could not configure kryo serializers from %s. The expected format is:" +
+						"'class:<fully qualified class name>,serializer:<fully qualified serializer name>;...",
+					kryoSerializers), e);
 		}
 	}