You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 08:49:17 UTC

[flink] branch master updated (3a56132 -> 403ec23)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3a56132  [FLINK-24208][rest] Add StopWithSavepointTriggerRequestBodyTest
     new e797108  [FLINK-24603][e2e] Simplify FlinkDistribution#submitJob
     new 24fd6d5  [FLINK-24603][e2e] Allow arbitrary jars to be added to the distribution
     new 295a1e3  [FLINK-24603][e2e] Add support for setting job main class
     new 403ec23  [FLINK-24603][e2e] Add test for Scala-free Flink

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/tests/util/flink/FlinkDistribution.java  |  48 ++++--
 .../flink/tests/util/flink/FlinkResourceSetup.java |  21 ++-
 .../flink/{JarLocation.java => JarAddition.java}   |  25 ++-
 .../flink/tests/util/flink/JobSubmission.java      |  25 ++-
 .../util/flink/LocalStandaloneFlinkResource.java   |   3 +
 .../flink-end-to-end-tests-scala}/pom.xml          | 178 ++++++++-------------
 .../java/org/apache/flink/tests/scala/JavaJob.java |  26 ++-
 .../tests/scala/JavaJobWithKryoSerializer.java     |  26 ++-
 .../org/apache/flink/tests/scala/NonPojo.java}     |  13 +-
 .../flink/tests/scala/NonPojoSerializer.java       |  23 +--
 .../org/apache/flink/tests/scala/ScalaJob.scala    |  42 ++---
 .../apache/flink/tests/scala/ScalaFreeITCase.java  | 130 +++++++++++++++
 .../src/test/resources/log4j2-test.properties      |   0
 flink-end-to-end-tests/pom.xml                     |   1 +
 14 files changed, 357 insertions(+), 204 deletions(-)
 copy flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/{JarLocation.java => JarAddition.java} (68%)
 copy {flink-scala => flink-end-to-end-tests/flink-end-to-end-tests-scala}/pom.xml (58%)
 copy flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/HostAndPort.java => flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJob.java (60%)
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkAddressEvent.java => flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java (55%)
 copy flink-end-to-end-tests/{flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java => flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojo.java} (75%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/NoOpExecutionDeploymentListener.java => flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojoSerializer.java (60%)
 copy flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala => flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/scala/org/apache/flink/tests/scala/ScalaJob.scala (52%)
 create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
 copy {flink-yarn => flink-end-to-end-tests/flink-end-to-end-tests-scala}/src/test/resources/log4j2-test.properties (100%)

[flink] 01/04: [FLINK-24603][e2e] Simplify FlinkDistribution#submitJob

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e79710849706fd275a04beccfe24d4d61e1c38f9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Oct 28 12:18:45 2021 +0200

    [FLINK-24603][e2e] Simplify FlinkDistribution#submitJob
    
    We wait either way for the process to complete (be it for the submission when detached, or the job completion when attached to parse the JobID), so we can just run the submission process in a blocking fashion
---
 .../flink/tests/util/flink/FlinkDistribution.java  | 24 +++++++---------------
 1 file changed, 7 insertions(+), 17 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 0632f62..4f779d5 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -188,24 +188,14 @@ final class FlinkDistribution {
                     }
                 };
 
-        try (AutoClosableProcess flink =
-                AutoClosableProcess.create(commands.toArray(new String[0]))
-                        .setStdoutProcessor(stdoutProcessor)
-                        .runNonBlocking()) {
-            if (jobSubmission.isDetached()) {
-                try {
-                    flink.getProcess().waitFor();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
+        AutoClosableProcess.create(commands.toArray(new String[0]))
+                .setStdoutProcessor(stdoutProcessor)
+                .runBlocking();
 
-            try {
-                return JobID.fromHexString(
-                        rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
-            } catch (Exception e) {
-                throw new IOException("Could not determine Job ID.", e);
-            }
+        try {
+            return JobID.fromHexString(rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
+        } catch (Exception e) {
+            throw new IOException("Could not determine Job ID.", e);
         }
     }
 

[flink] 02/04: [FLINK-24603][e2e] Allow arbitrary jars to be added to the distribution

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24fd6d551f776f229acc89f3198cbcf8d8966ea6
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Oct 28 12:19:40 2021 +0200

    [FLINK-24603][e2e] Allow arbitrary jars to be added to the distribution
    
    Required for adding a user-provided Scala library to lib.
---
 .../flink/tests/util/flink/FlinkDistribution.java  | 17 +++++++++
 .../flink/tests/util/flink/FlinkResourceSetup.java | 21 ++++++++++--
 .../apache/flink/tests/util/flink/JarAddition.java | 40 ++++++++++++++++++++++
 .../util/flink/LocalStandaloneFlinkResource.java   |  3 ++
 4 files changed, 79 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 4f779d5..cd3d935 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -33,6 +33,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
+import org.apache.commons.io.FilenameUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -213,6 +214,22 @@ final class FlinkDistribution {
                 .runBlocking(timeout);
     }
 
+    public void performJarAddition(JarAddition addition) throws IOException {
+        final Path target = mapJarLocationToPath(addition.getTarget());
+        final Path sourceJar = addition.getJar();
+
+        final String jarNameWithoutExtension =
+                FilenameUtils.removeExtension(sourceJar.getFileName().toString());
+
+        // put the jar into a directory within the target location; this is primarily needed for
+        // plugins/, but also works for lib/
+        final Path targetJar =
+                target.resolve(jarNameWithoutExtension).resolve(sourceJar.getFileName());
+        Files.createDirectories(targetJar.getParent());
+
+        Files.copy(sourceJar, targetJar);
+    }
+
     public void performJarOperation(JarOperation operation) throws IOException {
         final Path source = mapJarLocationToPath(operation.getSource());
         final Path target = mapJarLocationToPath(operation.getTarget());
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceSetup.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceSetup.java
index 7c8a1a1..1256e09 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceSetup.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceSetup.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,11 +33,15 @@ public class FlinkResourceSetup {
 
     @Nullable private final Configuration config;
     private final Collection<JarOperation> jarOperations;
+    private final Collection<JarAddition> jarAdditions;
 
     private FlinkResourceSetup(
-            @Nullable Configuration config, Collection<JarOperation> jarOperations) {
+            @Nullable Configuration config,
+            Collection<JarOperation> jarOperations,
+            Collection<JarAddition> jarAdditions) {
         this.config = config;
         this.jarOperations = Preconditions.checkNotNull(jarOperations);
+        this.jarAdditions = Preconditions.checkNotNull(jarAdditions);
     }
 
     public Optional<Configuration> getConfig() {
@@ -47,6 +52,10 @@ public class FlinkResourceSetup {
         return jarOperations;
     }
 
+    public Collection<JarAddition> getJarAdditions() {
+        return jarAdditions;
+    }
+
     public static FlinkResourceSetupBuilder builder() {
         return new FlinkResourceSetupBuilder();
     }
@@ -56,6 +65,7 @@ public class FlinkResourceSetup {
 
         private Configuration config;
         private final Collection<JarOperation> jarOperations = new ArrayList<>();
+        private final Collection<JarAddition> jarAdditions = new ArrayList<>();
 
         private FlinkResourceSetupBuilder() {}
 
@@ -64,6 +74,11 @@ public class FlinkResourceSetup {
             return this;
         }
 
+        public FlinkResourceSetupBuilder addJar(Path jar, JarLocation target) {
+            this.jarAdditions.add(new JarAddition(jar, target));
+            return this;
+        }
+
         public FlinkResourceSetupBuilder moveJar(
                 String jarNamePrefix, JarLocation source, JarLocation target) {
             this.jarOperations.add(
@@ -82,7 +97,9 @@ public class FlinkResourceSetup {
 
         public FlinkResourceSetup build() {
             return new FlinkResourceSetup(
-                    config, Collections.unmodifiableCollection(jarOperations));
+                    config,
+                    Collections.unmodifiableCollection(jarOperations),
+                    Collections.unmodifiableCollection(jarAdditions));
         }
     }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JarAddition.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JarAddition.java
new file mode 100644
index 0000000..8ea42b7
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JarAddition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+import java.nio.file.Path;
+
+/** Represents an add operation for a jar. */
+class JarAddition {
+
+    private final Path jar;
+    private final JarLocation target;
+
+    JarAddition(Path jar, JarLocation target) {
+        this.jar = jar;
+        this.target = target;
+    }
+
+    public Path getJar() {
+        return jar;
+    }
+
+    public JarLocation getTarget() {
+        return target;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 429e4f5..b5e2d75 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -85,6 +85,9 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
         for (JarOperation jarOperation : setup.getJarOperations()) {
             distribution.performJarOperation(jarOperation);
         }
+        for (JarAddition jarAddition : setup.getJarAdditions()) {
+            distribution.performJarAddition(jarAddition);
+        }
         if (setup.getConfig().isPresent()) {
             distribution.appendConfiguration(setup.getConfig().get());
         }

[flink] 03/04: [FLINK-24603][e2e] Add support for setting job main class

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 295a1e352605206c9c1d91969ec3eeb9dc983bf4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Oct 28 12:20:09 2021 +0200

    [FLINK-24603][e2e] Add support for setting job main class
---
 .../flink/tests/util/flink/FlinkDistribution.java  |  7 ++++++
 .../flink/tests/util/flink/JobSubmission.java      | 25 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index cd3d935..0988e43 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -169,6 +169,13 @@ final class FlinkDistribution {
             commands.add("-p");
             commands.add(String.valueOf(jobSubmission.getParallelism()));
         }
+        jobSubmission
+                .getMainClass()
+                .ifPresent(
+                        mainClass -> {
+                            commands.add("--class");
+                            commands.add(mainClass);
+                        });
         commands.add(jobSubmission.getJar().toAbsolutePath().toString());
         commands.addAll(jobSubmission.getArguments());
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobSubmission.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobSubmission.java
index ed3eabd..90e35be 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobSubmission.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobSubmission.java
@@ -20,25 +20,32 @@ package org.apache.flink.tests.util.flink;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Programmatic definition of a job-submission. */
 public class JobSubmission {
 
     private final Path jar;
+
+    private final String mainClass;
     private final int parallelism;
     private final boolean detached;
     private final List<String> arguments;
 
     JobSubmission(
             final Path jar,
+            @Nullable final String mainClass,
             final int parallelism,
             final boolean detached,
             final List<String> arguments) {
         this.jar = jar;
+        this.mainClass = mainClass;
         this.parallelism = parallelism;
         this.detached = detached;
         this.arguments = Collections.unmodifiableList(arguments);
@@ -60,12 +67,17 @@ public class JobSubmission {
         return jar;
     }
 
+    public Optional<String> getMainClass() {
+        return Optional.ofNullable(mainClass);
+    }
+
     /** Builder for the {@link JobSubmission}. */
     public static class JobSubmissionBuilder {
         private final Path jar;
         private int parallelism = 0;
         private final List<String> arguments = new ArrayList<>(2);
         private boolean detached = false;
+        private String mainClass = null;
 
         public JobSubmissionBuilder(final Path jar) {
             Preconditions.checkNotNull(jar);
@@ -74,6 +86,17 @@ public class JobSubmission {
         }
 
         /**
+         * Sets the main class for the job.
+         *
+         * @param mainClass main class for the job
+         * @return the modified builder
+         */
+        public JobSubmissionBuilder setMainClass(final String mainClass) {
+            this.mainClass = mainClass;
+            return this;
+        }
+
+        /**
          * Sets the parallelism for the job.
          *
          * @param parallelism parallelism for the job
@@ -122,7 +145,7 @@ public class JobSubmission {
         }
 
         public JobSubmission build() {
-            return new JobSubmission(jar, parallelism, detached, arguments);
+            return new JobSubmission(jar, mainClass, parallelism, detached, arguments);
         }
     }
 }

[flink] 04/04: [FLINK-24603][e2e] Add test for Scala-free Flink

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 403ec23921435dfb57614410754678d84dd0a276
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Oct 28 13:43:32 2021 +0200

    [FLINK-24603][e2e] Add test for Scala-free Flink
---
 .../flink-end-to-end-tests-scala/pom.xml           | 229 +++++++++++++++++++++
 .../java/org/apache/flink/tests/scala/JavaJob.java |  34 +++
 .../tests/scala/JavaJobWithKryoSerializer.java     |  36 ++++
 .../java/org/apache/flink/tests/scala/NonPojo.java |  28 +++
 .../flink/tests/scala/NonPojoSerializer.java       |  38 ++++
 .../org/apache/flink/tests/scala/ScalaJob.scala    |  47 +++++
 .../apache/flink/tests/scala/ScalaFreeITCase.java  | 130 ++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 flink-end-to-end-tests/pom.xml                     |   1 +
 9 files changed, 571 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-scala/pom.xml
new file mode 100644
index 0000000..0a514c8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/pom.xml
@@ -0,0 +1,229 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+		 xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.15-SNAPSHOT</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-end-to-end-tests-scala</artifactId>
+	<name>Flink : E2E Tests : Scala</name>
+
+	<properties>
+		<!-- Use an old version that is not actively supported by Flink.
+		 	2.11 still 'just works' with our current maven setup,
+		 	and going forward we won't add support for it again.-->
+		<scala.version>2.11.12</scala.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-reflect</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-enforcer-plugin</artifactId>
+				<executions>
+					<!-- we intentionally use an unsupported scala version -->
+					<execution>
+						<id>enforce-versions</id>
+						<goals>
+							<goal>enforce</goal>
+						</goals>
+						<phase>none</phase>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>Scala</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>scala</finalName>
+							<artifactSet>
+								<includes>
+									<include>org.scala-lang*:*</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+					<execution>
+						<id>Jobs</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>jobs</finalName>
+							<artifactSet>
+								<includes>
+									<include>${project.groupId}:${project.artifactId}</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/java</source>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+
+			<!-- Add Scala test classes to test jar in order to test Scala type information. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
\ No newline at end of file
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJob.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJob.java
new file mode 100644
index 0000000..51ff449
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJob.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.scala;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Simple batch job in pure Java. */
+public class JavaJob {
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // we want to go through serialization to check for kryo issues
+        env.disableOperatorChaining();
+
+        env.fromElements(new NonPojo()).map(x -> x);
+
+        env.execute();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
new file mode 100644
index 0000000..4c6a67e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.scala;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Simple batch job in pure Java that uses a custom Kryo serializer. */
+public class JavaJobWithKryoSerializer {
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // we want to go through serialization to check for kryo issues
+        env.disableOperatorChaining();
+
+        env.addDefaultKryoSerializer(NonPojo.class, NonPojoSerializer.class);
+
+        env.fromElements(new NonPojo()).map(x -> x);
+
+        env.execute();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojo.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojo.java
new file mode 100644
index 0000000..5f4d95d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.scala;
+
+/** Simple type that needs to go through Kryo for serialization. */
+public class NonPojo {
+    private final int someInt = 34;
+    private final String someString = "hello";
+
+    public int getSomeInt() {
+        return someInt;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojoSerializer.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojoSerializer.java
new file mode 100644
index 0000000..19ce251
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/NonPojoSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.scala;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/** Kryo serializer for {@link NonPojo}. */
+public class NonPojoSerializer extends Serializer<NonPojo> {
+
+    @Override
+    public void write(Kryo kryo, Output output, NonPojo object) {
+        output.writeInt(object.getSomeInt());
+    }
+
+    @Override
+    public NonPojo read(Kryo kryo, Input input, Class<NonPojo> type) {
+        input.readInt();
+        return new NonPojo();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/scala/org/apache/flink/tests/scala/ScalaJob.scala b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/scala/org/apache/flink/tests/scala/ScalaJob.scala
new file mode 100644
index 0000000..8af5eec
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/scala/org/apache/flink/tests/scala/ScalaJob.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.tests.scala
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import scala.runtime.BoxesRunTime
+
+/**
+ * A Scala job that can only run with Scala 2.11.
+ *
+ * <p>This job also acts as a stand-on for Java jobs using some Scala library.
+ */
+object ScalaJob {
+  def main(args: Array[String]): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+    // we want to go through serialization to check for kryo issues
+    env.disableOperatorChaining()
+
+    env.fromElements(new NonPojo()).map(new MapFunction[NonPojo, NonPojo] {
+      override def map(value: NonPojo): NonPojo = {
+        // use some method that was removed in 2.12+
+        BoxesRunTime.hashFromNumber(value.getSomeInt)
+        value
+      }
+    })
+
+    env.execute();
+  }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
new file mode 100644
index 0000000..005d454
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.scala;
+
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.categories.TravisGroup1;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.JarLocation;
+import org.apache.flink.tests.util.flink.JobSubmission;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+
+/**
+ * Tests that Flink does not require Scala for jobs that do not use the Scala APIs. This covers both
+ * pure Java jobs, and Scala jobs that use the Java APIs exclusively with Scala types.
+ */
+@Category(value = {TravisGroup1.class})
+@RunWith(Parameterized.class)
+public class ScalaFreeITCase extends TestLogger {
+
+    @Rule
+    public final TestExecutorResource<ScheduledExecutorService> testExecutorResource =
+            new TestExecutorResource<>(
+                    java.util.concurrent.Executors::newSingleThreadScheduledExecutor);
+
+    @Rule public final FlinkResource flink;
+    private final String mainClass;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<TestParams> testParameters() {
+        return Arrays.asList(
+                new TestParams("Java job, without Scala in lib/", JavaJob.class.getCanonicalName()),
+                new TestParams(
+                        "Java job with Kryo serializer, without Scala in lib/",
+                        JavaJobWithKryoSerializer.class.getCanonicalName()),
+                new TestParams(
+                        "Scala job, with user-provided Scala in lib/",
+                        ScalaJob.class.getCanonicalName(),
+                        builder ->
+                                builder.addJar(
+                                        TestUtils.getResource("/scala.jar"), JarLocation.LIB)));
+    }
+
+    public ScalaFreeITCase(TestParams testParams) {
+        final FlinkResourceSetup.FlinkResourceSetupBuilder builder =
+                FlinkResourceSetup.builder()
+                        .moveJar("flink-scala", JarLocation.LIB, JarLocation.OPT);
+        testParams.builderSetup.accept(builder);
+        flink = FlinkResource.get(builder.build());
+        mainClass = testParams.mainClass;
+    }
+
+    @Test
+    public void testScalaFreeJobExecution() throws Exception {
+        final Path jobJar = TestUtils.getResource("/jobs.jar");
+
+        try (final ClusterController clusterController = flink.startCluster(1)) {
+            // if the job fails then this throws an exception
+            clusterController.submitJob(
+                    new JobSubmission.JobSubmissionBuilder(jobJar)
+                            .setDetached(false)
+                            .setMainClass(mainClass)
+                            .build(),
+                    Duration.ofHours(1));
+        }
+    }
+
+    static class TestParams {
+
+        private final String description;
+        private final String mainClass;
+        private final Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup;
+
+        TestParams(String description, String mainClass) {
+            this(description, mainClass, ignored -> {});
+        }
+
+        TestParams(
+                String description,
+                String mainClass,
+                Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup) {
+            this.description = description;
+            this.mainClass = mainClass;
+            this.builderSetup = builderSetup;
+        }
+
+        public String getMainClass() {
+            return mainClass;
+        }
+
+        public Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> getBuilderSetup() {
+            return builderSetup;
+        }
+
+        @Override
+        public String toString() {
+            return description;
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index f9a166a..919da38 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -90,6 +90,7 @@ under the License.
 		<module>flink-end-to-end-tests-pulsar</module>
 		<module>flink-glue-schema-registry-avro-test</module>
 		<module>flink-glue-schema-registry-json-test</module>
+		<module>flink-end-to-end-tests-scala</module>
 	</modules>
 
 	<dependencyManagement>