You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/24 08:48:21 UTC
[2/3] flink git commit: [FLINK-8499] [core] Force Kryo to be
parent-first loaded.
[FLINK-8499] [core] Force Kryo to be parent-first loaded.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b466c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b466c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b466c05
Branch: refs/heads/master
Commit: 1b466c055d9d4cd481096af770118c7a899a90af
Parents: 6e894ee
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 19:58:10 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 09:47:36 2018 +0100
----------------------------------------------------------------------
.../apache/flink/configuration/CoreOptions.java | 2 +-
.../formats/avro/AvroKryoClassloadingTest.java | 89 ++++++++++++++++++++
.../core/testutils/FilteredClassLoader.java | 60 +++++++++++++
3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index cd93b99..191a0aa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -79,7 +79,7 @@ public class CoreOptions {
*/
public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
.key("classloader.parent-first-patterns")
- .defaultValue("java.;scala.;org.apache.flink.;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+ .defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
// ------------------------------------------------------------------------
// process parameters
http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
new file mode 100644
index 0000000..6eaca15
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.testutils.FilteredClassLoader;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test makes sure that reversed classloading works for the Avro/Kryo integration when
+ * Kryo is in the application jar file.
+ *
+ * <p>If Kryo is not loaded consistently through the same classloader (parent-first), the following
+ * error happens:
+ *
+ * <pre>
+ * java.lang.VerifyError: Bad type on operand stack
+ * Exception Details:
+ * Location:
+ * org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
+ * Reason:
+ * Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
+ * Current Frame:
+ * bci: @23
+ * flags: { }
+ * locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
+ * stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
+ * Bytecode:
+ * 0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
+ * 0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
+ * 0x0000020: 57b1
+ * </pre>
+ */
+public class AvroKryoClassloadingTest {
+
+ @Test
+ public void testKryoInChildClasspath() throws Exception {
+ final Class<?> avroClass = AvroKryoSerializerUtils.class;
+
+ final URL avroLocation = avroClass.getProtectionDomain().getCodeSource().getLocation();
+ final URL kryoLocation = Kryo.class.getProtectionDomain().getCodeSource().getLocation();
+
+ final ClassLoader parentClassLoader = new FilteredClassLoader(
+ avroClass.getClassLoader(), AvroKryoSerializerUtils.class.getName());
+
+ final ClassLoader userAppClassLoader = FlinkUserCodeClassLoaders.childFirst(
+ new URL[] { avroLocation, kryoLocation },
+ parentClassLoader,
+ CoreOptions.ALWAYS_PARENT_FIRST_LOADER.defaultValue().split(";"));
+
+ final Class<?> userLoadedAvroClass = Class.forName(avroClass.getName(), false, userAppClassLoader);
+ assertNotEquals(avroClass, userLoadedAvroClass);
+
+ // call the 'addAvroGenericDataArrayRegistration(...)' method
+ final Method m = userLoadedAvroClass.getMethod("addAvroGenericDataArrayRegistration", LinkedHashMap.class);
+
+ final LinkedHashMap<String, ?> map = new LinkedHashMap<>();
+ m.invoke(userLoadedAvroClass.newInstance(), map);
+
+ assertEquals(1, map.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b466c05/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
new file mode 100644
index 0000000..f04393b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+
+/**
+ * A ClassLoader that filters out certain classes (by name) and throws a ClassNotFoundException
+ * when they should be loaded.
+ *
+ * <p>This utility is useful when trying to eliminate certain classes from a class loader
+ * force loading them through another class loader.
+ */
+public class FilteredClassLoader extends ClassLoader {
+
+ /** The set of class names for the filtered classes. */
+ private final HashSet<String> filteredClassNames;
+
+ /**
+ * Creates a new filtered classloader.
+ *
+ * @param delegate The class loader that is filtered by this classloader.
+ * @param filteredClassNames The class names to filter out.
+ */
+ public FilteredClassLoader(ClassLoader delegate, String... filteredClassNames) {
+ super(Objects.requireNonNull(delegate));
+
+ this.filteredClassNames = new HashSet<>(Arrays.asList(filteredClassNames));
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ synchronized (this) {
+ if (filteredClassNames.contains(name)) {
+ throw new ClassNotFoundException(name);
+ }
+ else {
+ return super.loadClass(name, resolve);
+ }
+ }
+ }
+}