You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/16 21:28:26 UTC

[flink] 04/05: [FLINK-16917][orc] Workaround for classloader leak in OrcFile.

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c130daaf59e77b343d1c947822ea0573738a204
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Aug 4 11:37:20 2020 +0200

    [FLINK-16917][orc] Workaround for classloader leak in OrcFile.
    
    See https://issues.apache.org/jira/browse/ORC-653
    OrcFile#getStaticMemoryManager caches initial configuration and leaks classloader in it. Thus, new Flink jobs use the classloader of the first job implicitly.
    
    By adding ThreadLocalClassLoaderConfiguration, which forces the use of thread-local classloader over the initial classloader, Flink jobs use the appropriate classloader on higher runtime costs (no caches).
    This commit should be reverted, once the bug in ORC is fixed.
---
 .../flink/connectors/hive/HiveTableSink.java       |  7 ++-
 .../flink/orc/writer/OrcBulkWriterFactory.java     |  2 +-
 .../ThreadLocalClassLoaderConfiguration.java       | 59 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 2ffb792..cfe13b9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -25,6 +25,7 @@ import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
 import org.apache.flink.connectors.hive.write.HiveWriterFactory;
 import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.orc.OrcSplitReaderUtil;
+import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
@@ -255,12 +256,14 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 			formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
 		}
 		RowType formatType = RowType.of(formatTypes, formatNames);
-		Configuration formatConf = new Configuration(jobConf);
-		sd.getSerdeInfo().getParameters().forEach(formatConf::set);
 		if (serLib.contains("parquet")) {
+			Configuration formatConf = new Configuration(jobConf);
+			sd.getSerdeInfo().getParameters().forEach(formatConf::set);
 			return Optional.of(ParquetRowDataBuilder.createWriterFactory(
 					formatType, formatConf, hiveVersion.startsWith("3.")));
 		} else if (serLib.contains("orc")) {
+			Configuration formatConf = new ThreadLocalClassLoaderConfiguration(jobConf);
+			sd.getSerdeInfo().getParameters().forEach(formatConf::set);
 			TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(formatType);
 			return Optional.of(hiveShim.createOrcBulkWriterFactory(
 					formatConf, typeDescription.toString(), formatTypes));
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
index b34ef07..b5069d6 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
@@ -111,7 +111,7 @@ public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
 
 	private OrcFile.WriterOptions getWriterOptions() {
 		if (null == writerOptions) {
-			Configuration conf = new Configuration();
+			Configuration conf = new ThreadLocalClassLoaderConfiguration();
 			for (Map.Entry<String, String> entry : confMap.entrySet()) {
 				conf.set(entry.getKey(), entry.getValue());
 			}
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/ThreadLocalClassLoaderConfiguration.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/ThreadLocalClassLoaderConfiguration.java
new file mode 100644
index 0000000..708c413
--- /dev/null
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/ThreadLocalClassLoaderConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * Workaround for https://issues.apache.org/jira/browse/ORC-653.
+ *
+ * <p>Since the conf is effectively cached across Flink jobs, at least force the thread local classloader to avoid
+ * classloader leaks.
+ */
+@Internal
+public final class ThreadLocalClassLoaderConfiguration extends Configuration {
+	public ThreadLocalClassLoaderConfiguration() {
+	}
+
+	public ThreadLocalClassLoaderConfiguration(Configuration other) {
+		super(other);
+	}
+
+	@Override
+	public ClassLoader getClassLoader() {
+		return Thread.currentThread().getContextClassLoader();
+	}
+
+	@Override
+	public Class<?> getClassByNameOrNull(String name) {
+		try {
+			return Class.forName(name, true, getClassLoader());
+		} catch (ClassNotFoundException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public URL getResource(String name) {
+		return getClassLoader().getResource(name);
+	}
+}