You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/24 08:48:20 UTC

[1/3] flink git commit: [hotfix] [tests] Small simplification in 'CheckedThread'.

Repository: flink
Updated Branches:
  refs/heads/master 1c9c1e36c -> f1e4d25c1


[hotfix] [tests] Small simplification in 'CheckedThread'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e894ee1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e894ee1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e894ee1

Branch: refs/heads/master
Commit: 6e894ee13a3221fd98869c9d1c6c105729143dd2
Parents: 1c9c1e3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 19:04:03 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 09:47:35 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/core/testutils/CheckedThread.java   | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e894ee1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index 8de106d..f2647cc 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -34,8 +34,6 @@ public abstract class CheckedThread extends Thread {
 	/** The error thrown from the main work method. */
 	private volatile Throwable error;
 
-	private volatile boolean finished = false;
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -78,9 +76,6 @@ public abstract class CheckedThread extends Thread {
 		catch (Throwable t) {
 			error = t;
 		}
-		finally {
-			finished = true;
-		}
 	}
 
 	/**
@@ -123,7 +118,7 @@ public abstract class CheckedThread extends Thread {
 	}
 
 	private void checkFinished() throws Exception {
-		if (!finished) {
+		if (getState() != State.TERMINATED) {
 			throw new Exception(String.format(
 				"%s[name = %s] has not finished!",
 				this.getClass().getSimpleName(),


[3/3] flink git commit: [hotfix] [build] Fix diverging snappy versions.

Posted by se...@apache.org.
[hotfix] [build] Fix diverging snappy versions.

This removes the snappy dependency from flink-core, which is no longer needed since we do
not have an Avro dependency in flink-core any more.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1e4d25c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1e4d25c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1e4d25c

Branch: refs/heads/master
Commit: f1e4d25c11a678688064492d50ffad38c39ea877
Parents: 1b466c0
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 21:01:36 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 09:47:36 2018 +0100

----------------------------------------------------------------------
 flink-core/pom.xml    | 6 ------
 flink-runtime/pom.xml | 1 -
 pom.xml               | 2 +-
 3 files changed, 1 insertion(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1e4d25c/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 7ec8097..567de5c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,12 +80,6 @@ under the License.
 			<!-- managed version -->
 		</dependency>
 
-		<!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
-		<dependency>
-			<groupId>org.xerial.snappy</groupId>
-			<artifactId>snappy-java</artifactId>
-		</dependency>
-
 		<!-- ================== test dependencies ================== -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/f1e4d25c/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index ce29e97..ad2c974 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -168,7 +168,6 @@ under the License.
 		<dependency>
 			<groupId>org.xerial.snappy</groupId>
 			<artifactId>snappy-java</artifactId>
-			<version>1.1.4</version>
 		</dependency>
 
 		<!--

http://git-wip-us.apache.org/repos/asf/flink/blob/f1e4d25c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 317b36a..3065abf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,7 +300,7 @@ under the License.
 			<dependency>
 				<groupId>org.xerial.snappy</groupId>
 				<artifactId>snappy-java</artifactId>
-				<version>1.1.1.3</version>
+				<version>1.1.4</version>
 			</dependency>
 
 			<!-- Make sure we use a consistent avro version between Flink and Hadoop -->		


[2/3] flink git commit: [FLINK-8499] [core] Force Kryo to be parent-first loaded.

Posted by se...@apache.org.
[FLINK-8499] [core] Force Kryo to be parent-first loaded.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b466c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b466c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b466c05

Branch: refs/heads/master
Commit: 1b466c055d9d4cd481096af770118c7a899a90af
Parents: 6e894ee
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 19:58:10 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 09:47:36 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/CoreOptions.java |  2 +-
 .../formats/avro/AvroKryoClassloadingTest.java  | 89 ++++++++++++++++++++
 .../core/testutils/FilteredClassLoader.java     | 60 +++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index cd93b99..191a0aa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -79,7 +79,7 @@ public class CoreOptions {
 	 */
 	public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
 		.key("classloader.parent-first-patterns")
-		.defaultValue("java.;scala.;org.apache.flink.;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+		.defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
 
 	// ------------------------------------------------------------------------
 	//  process parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
new file mode 100644
index 0000000..6eaca15
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.testutils.FilteredClassLoader;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test makes sure that reversed classloading works for the Avro/Kryo integration when
+ * Kryo is in the application jar file.
+ *
+ * <p>If Kryo is not loaded consistently through the same classloader (parent-first), the following
+ * error happens:
+ *
+ * <pre>
+ * java.lang.VerifyError: Bad type on operand stack
+ * Exception Details:
+ *   Location:
+ *  org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
+ *   Reason:
+ *     Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
+ *   Current Frame:
+ *     bci: @23
+ *     flags: { }
+ *     locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
+ *     stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
+ *   Bytecode:
+ *     0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
+ *     0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
+ *     0x0000020: 57b1
+ * </pre>
+ */
+public class AvroKryoClassloadingTest {
+
+	@Test
+	public void testKryoInChildClasspath() throws Exception {
+		final Class<?> avroClass = AvroKryoSerializerUtils.class;
+
+		final URL avroLocation = avroClass.getProtectionDomain().getCodeSource().getLocation();
+		final URL kryoLocation = Kryo.class.getProtectionDomain().getCodeSource().getLocation();
+
+		final ClassLoader parentClassLoader = new FilteredClassLoader(
+				avroClass.getClassLoader(), AvroKryoSerializerUtils.class.getName());
+
+		final ClassLoader userAppClassLoader = FlinkUserCodeClassLoaders.childFirst(
+				new URL[] { avroLocation, kryoLocation },
+				parentClassLoader,
+				CoreOptions.ALWAYS_PARENT_FIRST_LOADER.defaultValue().split(";"));
+
+		final Class<?> userLoadedAvroClass = Class.forName(avroClass.getName(), false, userAppClassLoader);
+		assertNotEquals(avroClass, userLoadedAvroClass);
+
+		// call the 'addAvroGenericDataArrayRegistration(...)' method
+		final Method m = userLoadedAvroClass.getMethod("addAvroGenericDataArrayRegistration", LinkedHashMap.class);
+
+		final LinkedHashMap<String, ?> map = new LinkedHashMap<>();
+		m.invoke(userLoadedAvroClass.newInstance(), map);
+
+		assertEquals(1, map.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
new file mode 100644
index 0000000..f04393b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+
+/**
+ * A ClassLoader that filters out certain classes (by name) and throws a ClassNotFoundException
+ * when they should be loaded.
+ *
+ * <p>This utility is useful when trying to eliminate certain classes from a class loader
+ * force loading them through another class loader.
+ */
+public class FilteredClassLoader extends ClassLoader {
+
+	/** The set of class names for the filtered classes. */
+	private final HashSet<String> filteredClassNames;
+
+	/**
+	 * Creates a new filtered classloader.
+	 *
+	 * @param delegate The class loader that is filtered by this classloader.
+	 * @param filteredClassNames The class names to filter out.
+	 */
+	public FilteredClassLoader(ClassLoader delegate, String... filteredClassNames) {
+		super(Objects.requireNonNull(delegate));
+
+		this.filteredClassNames = new HashSet<>(Arrays.asList(filteredClassNames));
+	}
+
+	@Override
+	protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+		synchronized (this) {
+			if (filteredClassNames.contains(name)) {
+				throw new ClassNotFoundException(name);
+			}
+			else {
+				return super.loadClass(name, resolve);
+			}
+		}
+	}
+}