You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/24 21:30:58 UTC

[3/5] flink git commit: [FLINK-8499] [core] Force Kryo to be parent-first loaded.

[FLINK-8499] [core] Force Kryo to be parent-first loaded.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15cb057b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15cb057b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15cb057b

Branch: refs/heads/release-1.4
Commit: 15cb057bffd32ba8a853b46b207a5b7ea6bba430
Parents: da8446e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 19:58:10 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:06:14 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/CoreOptions.java |  2 +-
 .../formats/avro/AvroKryoClassloadingTest.java  | 89 ++++++++++++++++++++
 .../core/testutils/FilteredClassLoader.java     | 60 +++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index c48e5ef..27f39a4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -36,7 +36,7 @@ public class CoreOptions {
 
 	public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
 		.key("classloader.parent-first-patterns")
-		.defaultValue("java.;scala.;org.apache.flink.;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+		.defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
 
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
new file mode 100644
index 0000000..6eaca15
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.testutils.FilteredClassLoader;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test makes sure that reversed classloading works for the Avro/Kryo integration when
+ * Kryo is in the application jar file.
+ *
+ * <p>If Kryo is not loaded consistently through the same classloader (parent-first), the following
+ * error happens:
+ *
+ * <pre>
+ * java.lang.VerifyError: Bad type on operand stack
+ * Exception Details:
+ *   Location:
+ *  org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
+ *   Reason:
+ *     Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
+ *   Current Frame:
+ *     bci: @23
+ *     flags: { }
+ *     locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
+ *     stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
+ *   Bytecode:
+ *     0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
+ *     0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
+ *     0x0000020: 57b1
+ * </pre>
+ */
+public class AvroKryoClassloadingTest {
+
+	@Test
+	public void testKryoInChildClasspath() throws Exception {
+		final Class<?> avroClass = AvroKryoSerializerUtils.class;
+
+		final URL avroLocation = avroClass.getProtectionDomain().getCodeSource().getLocation();
+		final URL kryoLocation = Kryo.class.getProtectionDomain().getCodeSource().getLocation();
+
+		final ClassLoader parentClassLoader = new FilteredClassLoader(
+				avroClass.getClassLoader(), AvroKryoSerializerUtils.class.getName());
+
+		final ClassLoader userAppClassLoader = FlinkUserCodeClassLoaders.childFirst(
+				new URL[] { avroLocation, kryoLocation },
+				parentClassLoader,
+				CoreOptions.ALWAYS_PARENT_FIRST_LOADER.defaultValue().split(";"));
+
+		final Class<?> userLoadedAvroClass = Class.forName(avroClass.getName(), false, userAppClassLoader);
+		assertNotEquals(avroClass, userLoadedAvroClass);
+
+		// call the 'addAvroGenericDataArrayRegistration(...)' method
+		final Method m = userLoadedAvroClass.getMethod("addAvroGenericDataArrayRegistration", LinkedHashMap.class);
+
+		final LinkedHashMap<String, ?> map = new LinkedHashMap<>();
+		m.invoke(userLoadedAvroClass.newInstance(), map);
+
+		assertEquals(1, map.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
new file mode 100644
index 0000000..f04393b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+
+/**
+ * A ClassLoader that filters out certain classes (by name) and throws a ClassNotFoundException
+ * when they should be loaded.
+ *
+ * <p>This utility is useful when trying to eliminate certain classes from a class loader
+ * force loading them through another class loader.
+ */
+public class FilteredClassLoader extends ClassLoader {
+
+	/** The set of class names for the filtered classes. */
+	private final HashSet<String> filteredClassNames;
+
+	/**
+	 * Creates a new filtered classloader.
+	 *
+	 * @param delegate The class loader that is filtered by this classloader.
+	 * @param filteredClassNames The class names to filter out.
+	 */
+	public FilteredClassLoader(ClassLoader delegate, String... filteredClassNames) {
+		super(Objects.requireNonNull(delegate));
+
+		this.filteredClassNames = new HashSet<>(Arrays.asList(filteredClassNames));
+	}
+
+	@Override
+	protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+		synchronized (this) {
+			if (filteredClassNames.contains(name)) {
+				throw new ClassNotFoundException(name);
+			}
+			else {
+				return super.loadClass(name, resolve);
+			}
+		}
+	}
+}