You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:32:20 UTC

[1/3] flink git commit: [hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.

Repository: flink
Updated Branches:
  refs/heads/master 2d872447d -> 7baf7649e


[hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.

This closes #5705.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7baf7649
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7baf7649
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7baf7649

Branch: refs/heads/master
Commit: 7baf7649e9ecd485d6f036b7755c2f98cca74e3a
Parents: 21cf59d
Author: sihuazhou <su...@163.com>
Authored: Wed Mar 28 00:43:55 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 16 +++--
 .../streaming/state/RocksDBMapState.java        | 71 ++++++++++----------
 2 files changed, 47 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 31b9d99..3000667 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1313,10 +1313,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private static final List<Comparator<MergeIterator>> COMPARATORS;
 
 		static {
-			int maxBytes = 4;
+			int maxBytes = 2;
 			COMPARATORS = new ArrayList<>(maxBytes);
 			for (int i = 0; i < maxBytes; ++i) {
-				final int currentBytes = i;
+				final int currentBytes = i + 1;
 				COMPARATORS.add(new Comparator<MergeIterator>() {
 					@Override
 					public int compare(MergeIterator o1, MergeIterator o2) {
@@ -1330,9 +1330,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
 			Preconditions.checkNotNull(kvStateIterators);
+			Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
 			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
+			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
 			if (kvStateIterators.size() > 0) {
 				PriorityQueue<MergeIterator> iteratorPriorityQueue =
@@ -1837,10 +1839,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
 
-		/** The state meta data. */
+		/**
+		 * The state meta data.
+		 */
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
 
-		/** The copied column handle. */
+		/**
+		 * The copied column handle.
+		 */
 		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
 
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;

http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index c75a2ed..baa90fa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -540,52 +540,53 @@ public class RocksDBMapState<K, N, UK, UV>
 				return;
 			}
 
-			RocksIterator iterator = db.newIterator(columnFamily);
-
-			/*
-			 * The iteration starts from the prefix bytes at the first loading. The cache then is
-			 * reloaded when the next entry to return is the last one in the cache. At that time,
-			 * we will start the iterating from the last returned entry.
- 			 */
-			RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-			byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
-
-			cacheEntries.clear();
-			cacheIndex = 0;
-
-			iterator.seek(startBytes);
-
-			/*
-			 * If the last returned entry is not deleted, it will be the first entry in the
-			 * iterating. Skip it to avoid redundant access in such cases.
-			 */
-			if (lastEntry != null && !lastEntry.deleted) {
-				iterator.next();
-			}
-
-			while (true) {
-				if (!iterator.isValid() || !underSameKey(iterator.key())) {
-					expired = true;
-					break;
+			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
+			// occurred in the below code block.
+			try (RocksIterator iterator = db.newIterator(columnFamily)) {
+
+				/*
+				 * The iteration starts from the prefix bytes at the first loading. The cache then is
+				 * reloaded when the next entry to return is the last one in the cache. At that time,
+				 * we will start the iterating from the last returned entry.
+ 				 */
+				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+
+				cacheEntries.clear();
+				cacheIndex = 0;
+
+				iterator.seek(startBytes);
+
+				/*
+				 * If the last returned entry is not deleted, it will be the first entry in the
+				 * iterating. Skip it to avoid redundant access in such cases.
+				 */
+				if (lastEntry != null && !lastEntry.deleted) {
+					iterator.next();
 				}
 
-				if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
-					break;
-				}
+				while (true) {
+					if (!iterator.isValid() || !underSameKey(iterator.key())) {
+						expired = true;
+						break;
+					}
 
-				RocksDBMapEntry entry = new RocksDBMapEntry(
+					if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
+						break;
+					}
+
+					RocksDBMapEntry entry = new RocksDBMapEntry(
 						db,
 						iterator.key(),
 						iterator.value(),
 						keySerializer,
 						valueSerializer);
 
-				cacheEntries.add(entry);
+					cacheEntries.add(entry);
 
-				iterator.next();
+					iterator.next();
+				}
 			}
-
-			iterator.close();
 		}
 
 		private boolean underSameKey(byte[] rawKeyBytes) {


[2/3] flink git commit: [FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.

Posted by sr...@apache.org.
[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21cf59d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21cf59d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21cf59d5

Branch: refs/heads/master
Commit: 21cf59d5fffdca9e8335e1990c75e0c3cd684212
Parents: f5071d7
Author: sihuazhou <su...@163.com>
Authored: Fri Mar 16 23:07:54 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21cf59d5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
-		private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
+
+		/** The state meta data. */
+		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+		/** The copied column handle. */
+		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 
 		private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		public void takeDBSnapShot() {
 			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-			this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
+
+			this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+				stateBackend.kvStateInformation.values()) {
+				// snapshot meta info
+				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+				// copy column family handle
+				this.copiedColumnFamilyHandles.add(tuple2.f0);
+			}
 			this.snapshot = stateBackend.db.getSnapshot();
 		}
 
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private void writeKVStateMetaData() throws IOException {
 
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
-				new ArrayList<>(kvStateInformationCopy.size());
-
-			this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
+			this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size());
 
 			int kvStateId = 0;
 
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			readOptions = new ReadOptions();
 			readOptions.setSnapshot(snapshot);
 
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
-				kvStateInformationCopy) {
-
-				metaInfoSnapshots.add(column.f1.snapshot());
+			for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
 
 				kvStateIterators.add(
-					new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+					new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
 
 				++kvStateId;
 			}
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			KeyedBackendSerializationProxy<K> serializationProxy =
 				new KeyedBackendSerializationProxy<>(
 					stateBackend.getKeySerializer(),
-					metaInfoSnapshots,
+					stateMetaInfoSnapshots,
 					!Objects.equals(
 						UncompressedStreamCompressionDecorator.INSTANCE,
 						stateBackend.keyGroupCompressionDecorator));


[3/3] flink git commit: [FLINK-8968][state] Pull the creation of readOptions out of loop to avoid native resource leak.

Posted by sr...@apache.org.
[FLINK-8968][state] Pull the creation of readOptions out of loop to avoid native resource leak.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5071d7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5071d7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5071d7a

Branch: refs/heads/master
Commit: f5071d7a6128f78e7f0ab7ccd8813d30543fb3a2
Parents: 2d87244
Author: sihuazhou <su...@163.com>
Authored: Thu Mar 15 23:57:11 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java   | 9 +++++----
 .../streaming/state/RocksDBStateBackendConfigTest.java      | 8 ++++++--
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5071d7a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6a23181..cdeb608 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1952,15 +1952,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
 
 			int kvStateId = 0;
+
+			//retrieve iterator for this k/v states
+			readOptions = new ReadOptions();
+			readOptions.setSnapshot(snapshot);
+
 			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
 				kvStateInformationCopy) {
 
 				metaInfoSnapshots.add(column.f1.snapshot());
 
-				//retrieve iterator for this k/v states
-				readOptions = new ReadOptions();
-				readOptions.setSnapshot(snapshot);
-
 				kvStateIterators.add(
 					new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5071d7a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2dd67f5..65d5b2e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -299,7 +299,9 @@ public class RocksDBStateBackendConfigTest {
 		});
 
 		assertNotNull(rocksDbBackend.getOptions());
-		assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
+		try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
+			assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle());
+		}
 	}
 
 	@Test
@@ -324,7 +326,9 @@ public class RocksDBStateBackendConfigTest {
 
 		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
 		assertNotNull(rocksDbBackend.getOptions());
-		assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
+		try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
+			assertEquals(CompactionStyle.UNIVERSAL, colCreated.compactionStyle());
+		}
 	}
 
 	@Test