You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/14 19:08:07 UTC

[GitHub] [flink] zentol commented on a change in pull request #14372: [FLINK-19259][Kinesis] Remove references to allow classloader unloading

zentol commented on a change in pull request #14372:
URL: https://github.com/apache/flink/pull/14372#discussion_r542643500



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -128,12 +129,11 @@
 	 *
 	 * <p>The release hook is executed just before the user code class loader is being released.
 	 * Registration only happens if no hook has been registered under this name already.
-	 *

Review comment:
       revert

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.
+	 */
+	private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+		// unregister admin mbean
+		AwsSdkMetrics.unregisterMetricAdminMBean();
+
+		try {
+			// Remove FileAgeManager
+			Class<?> fileAgeManagerClazz = Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, classLoader);

Review comment:
       Shouldn't we be able to use `getClass().getClassLoader()`? Then we wouldn't have to modify the `RuntimeContext` API.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.
+	 */
+	private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+		// unregister admin mbean

Review comment:
       ```suggestion
   ```

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.

Review comment:
       It would be good to add a version reference for which `aws-java-sdk-core` was used.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.
+	 */
+	private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+		// unregister admin mbean
+		AwsSdkMetrics.unregisterMetricAdminMBean();
+
+		try {
+			// Remove FileAgeManager
+			Class<?> fileAgeManagerClazz = Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, classLoader);
+			Field instanceField = fileAgeManagerClazz.getDeclaredField("instance");
+			instanceField.setAccessible(true);
+
+			// unset (static final) field FileAgeManager.instance
+			Field modifiersField = Field.class.getDeclaredField("modifiers");
+			modifiersField.setAccessible(true);
+			modifiersField.setInt(instanceField, instanceField.getModifiers() & ~Modifier.FINAL);
+			Object fileAgeManager = instanceField.get(null);
+			instanceField.set(null, null);
+
+			// shutdown thread pool

Review comment:
       This should be the key change necessary to ensure the ClassLoader can be cleaned up.
   We shouldn't have to touch the FileAgeManager#instance reference; so long as no thread has references to them we should be good.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.
+	 */
+	private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+		// unregister admin mbean
+		AwsSdkMetrics.unregisterMetricAdminMBean();
+
+		try {
+			// Remove FileAgeManager
+			Class<?> fileAgeManagerClazz = Class.forName("com.amazonaws.services.kinesis.producer.FileAgeManager", true, classLoader);
+			Field instanceField = fileAgeManagerClazz.getDeclaredField("instance");
+			instanceField.setAccessible(true);
+
+			// unset (static final) field FileAgeManager.instance
+			Field modifiersField = Field.class.getDeclaredField("modifiers");
+			modifiersField.setAccessible(true);
+			modifiersField.setInt(instanceField, instanceField.getModifiers() & ~Modifier.FINAL);
+			Object fileAgeManager = instanceField.get(null);
+			instanceField.set(null, null);
+
+			// shutdown thread pool
+			Field executorField = fileAgeManagerClazz.getDeclaredField("executorService");
+			executorField.setAccessible(true);
+			ExecutorService executorService = (ExecutorService) executorField.get(fileAgeManager);
+			executorService.shutdown();
+			executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+			// Remove InstanceProfileCredentialsProvider
+			Class<?> credProviderClazz = Class.forName("com.amazonaws.auth.InstanceProfileCredentialsProvider", true, classLoader);

Review comment:
       Why are we clearing this singleton?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
##########
@@ -423,4 +432,43 @@ private void flushSync() throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Remove references created by the producer, preventing the classloader to unload. References were
+	 * analyzed as of version 0.14.0.
+	 */
+	private void runClassLoaderReleaseHook(ClassLoader classLoader) {
+		// unregister admin mbean
+		AwsSdkMetrics.unregisterMetricAdminMBean();
+
+		try {
+			// Remove FileAgeManager

Review comment:
       seems outdated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org