You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/12 17:52:35 UTC
[2/2] flink git commit: [FLINK-5310] [RocksDB] Harden the JNI library
loading
[FLINK-5310] [RocksDB] Harden the JNI library loading
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/609c046d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/609c046d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/609c046d
Branch: refs/heads/master
Commit: 609c046dba20cd07d9480715cfd1a6d78ed3a611
Parents: a078666
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 9 17:47:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 12 18:35:40 2016 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 2 -
.../streaming/state/RocksDBStateBackend.java | 75 ++++++++++++++++++++
.../streaming/state/RocksDBInitResetTest.java | 32 +++++++++
.../state/RocksDBStateBackendConfigTest.java | 14 +++-
4 files changed, 119 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4db622d..8637f6b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -149,8 +149,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.instanceBasePath = instanceBasePath;
this.instanceRocksDBPath = new File(instanceBasePath, "db");
- RocksDB.loadLibrary();
-
if (!instanceBasePath.exists()) {
if (!instanceBasePath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9ba0dc1..2109cea 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,11 +34,14 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -66,6 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
+ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+
+ private static boolean rocksDbInitialized = false;
+
// ------------------------------------------------------------------------
// Static configuration values
// ------------------------------------------------------------------------
@@ -229,6 +237,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {
+ // first, make sure that the RocksDB JNI library is loaded
+ // we do this explicitly here to have better error handling
+ String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
+ ensureRocksDBIsLoaded(tempDir);
+
lazyInitializeForJob(env, operatorIdentifier);
File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
@@ -257,6 +270,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
Collection<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
+ // first, make sure that the RocksDB JNI library is loaded
+ // we do this explicitly here to have better error handling
+ String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
+ ensureRocksDBIsLoaded(tempDir);
+
lazyInitializeForJob(env, operatorIdentifier);
File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
@@ -452,4 +470,61 @@ public class RocksDBStateBackend extends AbstractStateBackend {
", checkpointStreamBackend=" + checkpointStreamBackend +
'}';
}
+
+ // ------------------------------------------------------------------------
+ // static library loading utilities
+ // ------------------------------------------------------------------------
+
+ private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception {
+ // lock on something that cannot be in the user JAR
+ synchronized (org.apache.flink.runtime.taskmanager.Task.class) {
+ if (!rocksDbInitialized) {
+
+ final File tempDirFile = new File(tempDirectory);
+ final String path = tempDirFile.getAbsolutePath();
+
+ LOG.info("Attempting to load RocksDB native library and store it at '{}'", path);
+
+ Throwable lastException = null;
+ for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
+ try {
+ // make sure the temp path exists
+ // noinspection ResultOfMethodCallIgnored
+ tempDirFile.mkdirs();
+
+ // explicitly load the JNI dependency if it has not been loaded before
+ NativeLibraryLoader.getInstance().loadLibrary(path);
+
+ // this initialization here should validate that the loading succeeded
+ RocksDB.loadLibrary();
+
+ // seems to have worked
+ LOG.info("Successfully loaded RocksDB native library");
+ rocksDbInitialized = true;
+ return;
+ }
+ catch (Throwable t) {
+ lastException = t;
+ LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t);
+
+ // try to force RocksDB to attempt reloading the library
+ try {
+ resetRocksDBLoadedFlag();
+ } catch (Throwable tt) {
+ LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt);
+ }
+ }
+ }
+
+ throw new Exception("Could not load the native RocksDB library", lastException);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static void resetRocksDBLoadedFlag() throws Exception {
+ final Field initField = org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
+ initField.setAccessible(true);
+ initField.setBoolean(null, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
new file mode 100644
index 0000000..7343b56
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.junit.Test;
+
+/**
+ * This test checks that the RocksDB native code loader still responds to resetting the
+ */
+public class RocksDBInitResetTest {
+
+ @Test
+ public void testResetInitFlag() throws Exception {
+ RocksDBStateBackend.resetRocksDBLoadedFlag();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bf9b315..07fb48e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -22,12 +22,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.OperatingSystem;
import org.junit.Assume;
import org.junit.Before;
@@ -95,7 +97,7 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
- Environment env = getMockEnvironment(new File[] {});
+ Environment env = getMockEnvironment();
RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
createKeyedStateBackend(
env,
@@ -360,6 +362,11 @@ public class RocksDBStateBackendConfigTest {
}
private static Environment getMockEnvironment(File[] tempDirs) {
+ final String[] tempDirStrings = new String[tempDirs.length];
+ for (int i = 0; i < tempDirs.length; i++) {
+ tempDirStrings[i] = tempDirs[i].getAbsolutePath();
+ }
+
IOManager ioMan = mock(IOManager.class);
when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
@@ -371,8 +378,11 @@ public class RocksDBStateBackendConfigTest {
TaskInfo taskInfo = mock(TaskInfo.class);
when(env.getTaskInfo()).thenReturn(taskInfo);
-
when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+ TaskManagerRuntimeInfo tmInfo = new TaskManagerRuntimeInfo("localhost", new Configuration(), tempDirStrings);
+ when(env.getTaskManagerInfo()).thenReturn(tmInfo);
+
return env;
}
}