You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/12 17:52:34 UTC

[1/2] flink git commit: [hotfix] [tests] Add re-tries to the result verification via files.

Repository: flink
Updated Branches:
  refs/heads/master a078666d4 -> 57f7747bb


[hotfix] [tests] Add re-tries to the result verification via files.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57f7747b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57f7747b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57f7747b

Branch: refs/heads/master
Commit: 57f7747bb5fa21be5c91338ec3c3aa7ffcecb59f
Parents: 609c046
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 9 17:53:14 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 12 18:35:40 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/test/util/TestBaseUtils.java   | 84 +++++++++++++-------
 1 file changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57f7747b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 5e15076..b8470b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -22,8 +22,10 @@ import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -32,10 +34,14 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
+
 import org.apache.hadoop.fs.FileSystem;
+
 import org.junit.Assert;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -213,8 +219,11 @@ public class TestBaseUtils extends TestLogger {
 		return getResultReader(resultPath, new String[]{}, false);
 	}
 
-	public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
-											boolean inOrderOfFiles) throws IOException {
+	public static BufferedReader[] getResultReader(
+			String resultPath,
+			String[] excludePrefixes,
+			boolean inOrderOfFiles) throws IOException {
+
 		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 
 		if (inOrderOfFiles) {
@@ -268,8 +277,11 @@ public class TestBaseUtils extends TestLogger {
 		readAllResultLines(target, resultPath, excludePrefixes, false);
 	}
 
-	public static void readAllResultLines(List<String> target, String resultPath,
-											String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+	public static void readAllResultLines(
+			List<String> target,
+			String resultPath,
+			String[] excludePrefixes,
+			boolean inOrderOfFiles) throws IOException {
 
 		final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
 		try {
@@ -282,12 +294,7 @@ public class TestBaseUtils extends TestLogger {
 		}
 		finally {
 			for (BufferedReader reader : readers) {
-				try {
-					reader.close();
-				}
-				catch (Exception e) {
-					// ignore, this is best-effort cleanup
-				}
+				org.apache.flink.util.IOUtils.closeQuietly(reader);
 			}
 		}
 	}
@@ -296,19 +303,42 @@ public class TestBaseUtils extends TestLogger {
 		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[0]);
 	}
 
-	public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
-											String[] excludePrefixes) throws Exception {
-		ArrayList<String> list = new ArrayList<>();
-		readAllResultLines(list, resultPath, excludePrefixes, false);
+	public static void compareResultsByLinesInMemory(
+			String expectedResultStr,
+			String resultPath,
+			String[] excludePrefixes) throws Exception {
 
-		String[] result = list.toArray(new String[list.size()]);
-		Arrays.sort(result);
+		// because of some strange I/O inconsistency effects on CI infrastructure, we need
+		// to retry this a few times
+		final int numAttempts = 5;
+		int attempt = 0;
+		while (true) {
+			try {
+				ArrayList<String> list = new ArrayList<>();
+				readAllResultLines(list, resultPath, excludePrefixes, false);
 
-		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
-		Arrays.sort(expected);
+				String[] result = list.toArray(new String[list.size()]);
+				Arrays.sort(result);
 
-		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
-		Assert.assertArrayEquals(expected, result);
+				String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
+				Arrays.sort(expected);
+
+				Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
+				Assert.assertArrayEquals(expected, result);
+
+				break;
+			}
+			catch (AssertionError e) {
+				if (++attempt > numAttempts) {
+					throw e;
+				}
+
+				// else wait, then fall through the loop and try again
+				// on normal setups, this should change nothing, but it seems to help the
+				// Travis CI container infrastructure
+				Thread.sleep(100);
+			}
+		}
 	}
 
 	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
@@ -390,19 +420,17 @@ public class TestBaseUtils extends TestLogger {
 		}
 	}
 
-	private static File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
-		final String[] exPrefs = excludePrefixes;
-		File result = asFile(resultPath);
-		if (!result.exists()) {
-			Assert.fail("Result file was not written");
-		}
+	private static File[] getAllInvolvedFiles(String resultPath, final String[] excludePrefixes) {
+		final File result = asFile(resultPath);
+		assertTrue("Result file was not written", result.exists());
+
 		if (result.isDirectory()) {
 			return result.listFiles(new FilenameFilter() {
 
 				@Override
 				public boolean accept(File dir, String name) {
-					for(String p: exPrefs) {
-						if(name.startsWith(p)) {
+					for (String p: excludePrefixes) {
+						if (name.startsWith(p)) {
 							return false;
 						}
 					}


[2/2] flink git commit: [FLINK-5310] [RocksDB] Harden the JNI library loading

Posted by se...@apache.org.
[FLINK-5310] [RocksDB] Harden the JNI library loading


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/609c046d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/609c046d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/609c046d

Branch: refs/heads/master
Commit: 609c046dba20cd07d9480715cfd1a6d78ed3a611
Parents: a078666
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 9 17:47:25 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 12 18:35:40 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  2 -
 .../streaming/state/RocksDBStateBackend.java    | 75 ++++++++++++++++++++
 .../streaming/state/RocksDBInitResetTest.java   | 32 +++++++++
 .../state/RocksDBStateBackendConfigTest.java    | 14 +++-
 4 files changed, 119 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4db622d..8637f6b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -149,8 +149,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.instanceBasePath = instanceBasePath;
 		this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-		RocksDB.loadLibrary();
-
 		if (!instanceBasePath.exists()) {
 			if (!instanceBasePath.mkdirs()) {
 				throw new RuntimeException("Could not create RocksDB data directory.");

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9ba0dc1..2109cea 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,11 +34,14 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,6 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
 
+	private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+
+	private static boolean rocksDbInitialized = false;
+
 	// ------------------------------------------------------------------------
 	//  Static configuration values
 	// ------------------------------------------------------------------------
@@ -229,6 +237,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 
+		// first, make sure that the RocksDB JNI library is loaded
+		// we do this explicitly here to have better error handling
+		String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
+		ensureRocksDBIsLoaded(tempDir);
+
 		lazyInitializeForJob(env, operatorIdentifier);
 
 		File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
@@ -257,6 +270,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			Collection<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 
+		// first, make sure that the RocksDB JNI library is loaded
+		// we do this explicitly here to have better error handling
+		String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
+		ensureRocksDBIsLoaded(tempDir);
+
 		lazyInitializeForJob(env, operatorIdentifier);
 
 		File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
@@ -452,4 +470,61 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			", checkpointStreamBackend=" + checkpointStreamBackend +
 			'}';
 	}
+
+	// ------------------------------------------------------------------------
+	//  static library loading utilities
+	// ------------------------------------------------------------------------
+
+	private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception {
+		// lock on something that cannot be in the user JAR
+		synchronized (org.apache.flink.runtime.taskmanager.Task.class) {
+			if (!rocksDbInitialized) {
+
+				final File tempDirFile = new File(tempDirectory);
+				final String path = tempDirFile.getAbsolutePath();
+
+				LOG.info("Attempting to load RocksDB native library and store it at '{}'", path);
+
+				Throwable lastException = null;
+				for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
+					try {
+						// make sure the temp path exists
+						// noinspection ResultOfMethodCallIgnored
+						tempDirFile.mkdirs();
+
+						// explicitly load the JNI dependency if it has not been loaded before
+						NativeLibraryLoader.getInstance().loadLibrary(path);
+
+						// this initialization here should validate that the loading succeeded
+						RocksDB.loadLibrary();
+
+						// seems to have worked
+						LOG.info("Successfully loaded RocksDB native library");
+						rocksDbInitialized = true;
+						return;
+					}
+					catch (Throwable t) {
+						lastException = t;
+						LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t);
+
+						// try to force RocksDB to attempt reloading the library
+						try {
+							resetRocksDBLoadedFlag();
+						} catch (Throwable tt) {
+							LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt);
+						}
+					}
+				}
+
+				throw new Exception("Could not load the native RocksDB library", lastException);
+			}
+		}
+	}
+
+	@VisibleForTesting
+	static void resetRocksDBLoadedFlag() throws Exception {
+		final Field initField = org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
+		initField.setAccessible(true);
+		initField.setBoolean(null, false);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
new file mode 100644
index 0000000..7343b56
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.junit.Test;
+
+/**
+ * This test checks that the RocksDB native code loader still responds to resetting the
+ */
+public class RocksDBInitResetTest {
+
+	@Test
+	public void testResetInitFlag() throws Exception {
+		RocksDBStateBackend.resetRocksDBLoadedFlag();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/609c046d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bf9b315..07fb48e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -22,12 +22,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
 import org.junit.Before;
@@ -95,7 +97,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
 		assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
 
-		Environment env = getMockEnvironment(new File[] {});
+		Environment env = getMockEnvironment();
 		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
 				createKeyedStateBackend(
 						env,
@@ -360,6 +362,11 @@ public class RocksDBStateBackendConfigTest {
 	}
 
 	private static Environment getMockEnvironment(File[] tempDirs) {
+		final String[] tempDirStrings = new String[tempDirs.length];
+		for (int i = 0; i < tempDirs.length; i++) {
+			tempDirStrings[i] = tempDirs[i].getAbsolutePath();
+		}
+
 		IOManager ioMan = mock(IOManager.class);
 		when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
 
@@ -371,8 +378,11 @@ public class RocksDBStateBackendConfigTest {
 
 		TaskInfo taskInfo = mock(TaskInfo.class);
 		when(env.getTaskInfo()).thenReturn(taskInfo);
-
 		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+		TaskManagerRuntimeInfo tmInfo = new TaskManagerRuntimeInfo("localhost", new Configuration(), tempDirStrings);
+		when(env.getTaskManagerInfo()).thenReturn(tmInfo);
+
 		return env;
 	}
 }