You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/25 16:55:53 UTC

[2/2] flink git commit: [FLINK-5626] Improved resource deallocation in RocksDBKeyedStateBackend

[FLINK-5626] Improved resource deallocation in RocksDBKeyedStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd9115ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd9115ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd9115ff

Branch: refs/heads/master
Commit: cd9115ffd8f93c8fdcdb1645893db4b7684de589
Parents: a811545
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 19 18:08:02 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Jan 25 17:55:44 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 158 ++++++++++---------
 .../streaming/state/RocksDBStateBackend.java    |  43 ++---
 .../state/RocksDBStateBackendConfigTest.java    |  19 +--
 .../flink/util/AbstractCloseableRegistry.java   |   4 +-
 .../java/org/apache/flink/util/IOUtils.java     |  50 +++++-
 .../checkpoint/StateAssignmentOperation.java    |   6 +
 .../runtime/state/StateBackendTestBase.java     |   7 +-
 7 files changed, 169 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index dccf3ac..61a796e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -56,9 +56,9 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -68,11 +68,9 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -96,15 +94,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
-	/** Operator identifier that is used to uniqueify the RocksDB storage path. */
-	private final String operatorIdentifier;
-
-	/** JobID for uniquifying backup paths. */
-	private final JobID jobId;
-
-	/** The options from the options factory, cached */
+	/** The column family options from the options factory */
 	private final ColumnFamilyOptions columnOptions;
 
+	/** The DB options from the options factory */
+	private final DBOptions dbOptions;
+
 	/** Path where this configured instance stores its data directory */
 	private final File instanceBasePath;
 
@@ -145,19 +140,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange
-	) throws Exception {
+	) throws IOException {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
-		this.operatorIdentifier = operatorIdentifier;
-		this.jobId = jobId;
-		this.columnOptions = columnFamilyOptions;
+		this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions);
+		this.dbOptions = Preconditions.checkNotNull(dbOptions);
 
-		this.instanceBasePath = instanceBasePath;
+		this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
 		this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
 		if (!instanceBasePath.exists()) {
 			if (!instanceBasePath.mkdirs()) {
-				throw new RuntimeException("Could not create RocksDB data directory.");
+				throw new IOException("Could not create RocksDB data directory.");
 			}
 		}
 
@@ -168,7 +162,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				FileUtils.deleteDirectory(instanceRocksDBPath);
 			}
 		} catch (IOException e) {
-			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+			throw new IOException("Error cleaning RocksDB data directory.", e);
 		}
 
 		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
@@ -176,9 +170,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
 		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
 		try {
-			db = RocksDB.open(dbOptions, instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+
+			db = RocksDB.open(
+					Preconditions.checkNotNull(dbOptions),
+					instanceRocksDBPath.getAbsolutePath(),
+					columnFamilyDescriptors,
+					columnFamilyHandles);
+
 		} catch (RocksDBException e) {
-			throw new RuntimeException("Error while opening RocksDB instance.", e);
+			throw new IOException("Error while opening RocksDB instance.", e);
 		}
 		keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
 		kvStateInformation = new HashMap<>();
@@ -200,21 +200,32 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
 						kvStateInformation.values()) {
-
-					column.f0.close();
+					try {
+						column.f0.close();
+					} catch (Exception ex) {
+						LOG.info("Exception while closing ColumnFamilyHandle object.", ex);
+					}
 				}
 
 				kvStateInformation.clear();
 
-				db.close();
+				try {
+					db.close();
+				} catch (Exception ex) {
+					LOG.info("Exception while closing RocksDB object.", ex);
+				}
+
 				db = null;
 			}
 		}
 
+		IOUtils.closeQuietly(columnOptions);
+		IOUtils.closeQuietly(dbOptions);
+
 		try {
 			FileUtils.deleteDirectory(instanceBasePath);
 		} catch (IOException ioex) {
-			LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath);
+			LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex);
 		}
 	}
 
@@ -245,14 +256,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// hold the db lock while operation on the db to guard us against async db disposal
 		synchronized (asyncSnapshotLock) {
 
-			if (kvStateInformation.isEmpty()) {
-				LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
-						" . Returning null.");
+			if (db != null) {
 
-				return new DoneFuture<>(null);
-			}
+				if (kvStateInformation.isEmpty()) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
+								" . Returning null.");
+					}
+
+					return new DoneFuture<>(null);
+				}
 
-			if (db != null) {
 				snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
 			} else {
 				throw new IOException("RocksDB closed.");
@@ -328,9 +342,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
+		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+
 		private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
 		private DataOutputView outputView;
-		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 		private KeyGroupsStateHandle snapshotResultStateHandle;
 
 		RocksDBSnapshotOperation(
@@ -401,26 +416,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		/**
 		 * 5) Release the snapshot object for RocksDB and clean up.
-		 *
 		 */
 		public void releaseSnapshotResources(boolean canceled) {
+
 			if (null != kvStateIterators) {
 				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
-					kvStateIterator.f0.close();
+					IOUtils.closeQuietly(kvStateIterator.f0);
 				}
 				kvStateIterators = null;
 			}
 
 			if (null != snapshot) {
-				if(null != stateBackend.db) {
+				if (null != stateBackend.db) {
 					stateBackend.db.releaseSnapshot(snapshot);
 				}
-				snapshot.close();
+				IOUtils.closeQuietly(snapshot);
 				snapshot = null;
 			}
 
 			if (null != readOptions) {
-				readOptions.close();
+				IOUtils.closeQuietly(readOptions);
 				readOptions = null;
 			}
 
@@ -477,8 +492,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				//retrieve iterator for this k/v states
 				readOptions = new ReadOptions();
 				readOptions.setSnapshot(snapshot);
-				RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
-				kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
+
+				kvStateIterators.add(
+						new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
+
 				++kvStateId;
 			}
 
@@ -493,12 +510,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			byte[] previousKey = null;
 			byte[] previousValue = null;
 
-			List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators;
-			this.kvStateIterators = null;
-
 			// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
 			try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
-					kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) {
+					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
+
+				// handover complete, null out to prevent double close
+				kvStateIterators = null;
 
 				//preamble: setup with first key-group as our lookahead
 				if (mergeIterator.isValid()) {
@@ -584,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		private static void checkInterrupted() throws InterruptedException {
-			if(Thread.currentThread().isInterrupted()) {
+			if (Thread.currentThread().isInterrupted()) {
 				throw new InterruptedException("RocksDB snapshot interrupted.");
 			}
 		}
@@ -674,7 +691,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			} finally {
 				if (currentStateHandleInStream != null) {
 					rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
-					currentStateHandleInStream.close();
+					IOUtils.closeQuietly(currentStateHandleInStream);
 				}
 			}
 		}
@@ -778,7 +795,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * that we checkpointed, i.e. is already in the map of column families.
 	 */
 	@SuppressWarnings("rawtypes, unchecked")
-	protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
+	protected <N, S> ColumnFamilyHandle getColumnFamily(
+			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
 
 		Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
 				kvStateInformation.get(descriptor.getName());
@@ -790,12 +808,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				descriptor.getSerializer());
 
 		if (stateInfo != null) {
-			if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
-				throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
+			if (newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+				stateInfo.f1 = newMetaInfo;
+				return stateInfo.f0;
+			} else {
+				throw new IOException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
 						" trying access with " + newMetaInfo);
 			}
-			stateInfo.f1 = newMetaInfo;
-			return stateInfo.f0;
 		}
 
 		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
@@ -809,7 +828,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			rawAccess.put(descriptor.getName(), tuple);
 			return columnFamily;
 		} catch (RocksDBException e) {
-			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
+			throw new IOException("Error creating ColumnFamilyHandle.", e);
 		}
 	}
 
@@ -866,21 +885,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
 	 * Used by #MergeIterator.
 	 */
-	static final class MergeIterator {
+	static final class MergeIterator implements AutoCloseable {
 
 		/**
-		 *
-		 * @param iterator The #RocksIterator to wrap .
+		 * @param iterator  The #RocksIterator to wrap .
 		 * @param kvStateId Id of the K/V state to which this iterator belongs.
 		 */
-		public MergeIterator(RocksIterator iterator, int kvStateId) {
+		MergeIterator(RocksIterator iterator, int kvStateId) {
 			this.iterator = Preconditions.checkNotNull(iterator);
 			this.currentKey = iterator.key();
 			this.kvStateId = kvStateId;
 		}
 
-		private byte[] currentKey;
 		private final RocksIterator iterator;
+		private byte[] currentKey;
 		private final int kvStateId;
 
 		public byte[] getCurrentKey() {
@@ -899,8 +917,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			return kvStateId;
 		}
 
+		@Override
 		public void close() {
-			this.iterator.close();
+			IOUtils.closeQuietly(iterator);
 		}
 	}
 
@@ -908,7 +927,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
 	 * The resulting iteration sequence is ordered by (key-group, kv-state).
 	 */
-	static final class RocksDBMergeIterator implements Closeable {
+	static final class RocksDBMergeIterator implements AutoCloseable {
 
 		private final PriorityQueue<MergeIterator> heap;
 		private final int keyGroupPrefixByteCount;
@@ -943,20 +962,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
 
 			if (kvStateIterators.size() > 0) {
-				this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+				PriorityQueue<MergeIterator> iteratorPriorityQueue =
+						new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
 				for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
-					RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+					final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
 					rocksIterator.seekToFirst();
 					if (rocksIterator.isValid()) {
-						heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+						iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
 					} else {
-						rocksIterator.close();
+						IOUtils.closeQuietly(rocksIterator);
 					}
 				}
 
 				kvStateIterators.clear();
 
+				this.heap = iteratorPriorityQueue;
 				this.valid = !heap.isEmpty();
 				this.currentSubIterator = heap.poll();
 			} else {
@@ -991,7 +1012,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					detectNewKeyGroup(oldKey);
 				}
 			} else {
-				rocksIterator.close();
+				IOUtils.closeQuietly(rocksIterator);
 
 				if (heap.isEmpty()) {
 					currentSubIterator = null;
@@ -1082,16 +1103,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		@Override
 		public void close() {
+			IOUtils.closeQuietly(currentSubIterator);
+			currentSubIterator = null;
 
-			if (null != currentSubIterator) {
-				currentSubIterator.close();
-				currentSubIterator = null;
-			}
-
-			for (MergeIterator iterator : heap) {
-				iterator.close();
-			}
-
+			IOUtils.closeAllQuietly(heap);
 			heap.clear();
 		}
 	}
@@ -1148,7 +1163,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// the EOFException will get us out of this...
 			while (true) {
 				byte mappingByte = inputView.readByte();
-				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
+				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte),null);
+
 				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
 
 				ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1e5620f..3bfe742 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -63,6 +63,7 @@ import static java.util.Objects.requireNonNull;
  * {@link #setOptions(OptionsFactory)}.
  */
 public class RocksDBStateBackend extends AbstractStateBackend {
+
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
@@ -104,10 +105,6 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	/** The options factory to create the RocksDB options in the cluster */
 	private OptionsFactory optionsFactory;
 
-	/** The options from the options factory, cached */
-	private transient DBOptions dbOptions;
-	private transient ColumnFamilyOptions columnOptions;
-
 	/** Whether we already lazily initialized our local storage directories. */
 	private transient boolean isInitialized = false;
 
@@ -392,39 +389,33 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances.
 	 */
 	public DBOptions getDbOptions() {
-		if (dbOptions == null) {
-			// initial options from pre-defined profile
-			DBOptions opt = predefinedOptions.createDBOptions();
+		// initial options from pre-defined profile
+		DBOptions opt = predefinedOptions.createDBOptions();
 
-			// add user-defined options, if specified
-			if (optionsFactory != null) {
-				opt = optionsFactory.createDBOptions(opt);
-			}
+		// add user-defined options, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createDBOptions(opt);
+		}
 
-			// add necessary default options
-			opt = opt.setCreateIfMissing(true);
+		// add necessary default options
+		opt = opt.setCreateIfMissing(true);
 
-			dbOptions = opt;
-		}
-		return dbOptions;
+		return opt;
 	}
 
 	/**
 	 * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances.
 	 */
 	public ColumnFamilyOptions getColumnOptions() {
-		if (columnOptions == null) {
-			// initial options from pre-defined profile
-			ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
-
-			// add user-defined options, if specified
-			if (optionsFactory != null) {
-				opt = optionsFactory.createColumnOptions(opt);
-			}
+		// initial options from pre-defined profile
+		ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
 
-			columnOptions = opt;
+		// add user-defined options, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createColumnOptions(opt);
 		}
-		return columnOptions;
+
+		return opt;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 9524352..463dd44 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -261,17 +261,18 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
 		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
 
-		DBOptions opt1 = rocksDbBackend.getDbOptions();
-		DBOptions opt2 = rocksDbBackend.getDbOptions();
+		try (
+				DBOptions optCreated = rocksDbBackend.getDbOptions();
+				DBOptions optReference = new DBOptions();
+				ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
 
-		assertEquals(opt1, opt2);
+			// check that our instance uses something that we configured
+			assertEquals(true, optCreated.disableDataSync());
+			// just ensure that we pickend an option that actually differs from the reference.
+			assertEquals(false, optReference.disableDataSync());
 
-		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
-		ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
-
-		assertEquals(columnOpt1, columnOpt2);
-
-		assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
+			assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle());
+		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 7c0291c..85af982 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -88,9 +88,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	public void close() throws IOException {
 		synchronized (getSynchronizationLock()) {
 
-			for (Closeable closeable : closeableToRef.keySet()) {
-				IOUtils.closeQuietly(closeable);
-			}
+			IOUtils.closeAllQuietly(closeableToRef.keySet());
 
 			closeableToRef.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 11c06a8..0bdc13a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.util;
 
 import org.slf4j.Logger;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -166,7 +165,7 @@ public final class IOUtils {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+	 * Close the AutoCloseable objects and <b>ignore</b> any {@link Exception} or
 	 * null pointers. Must only be used for cleanup in exception handlers.
 	 * 
 	 * @param log
@@ -174,12 +173,12 @@ public final class IOUtils {
 	 * @param closeables
 	 *        the objects to close
 	 */
-	public static void cleanup(final Logger log, final java.io.Closeable... closeables) {
-		for (java.io.Closeable c : closeables) {
+	public static void cleanup(final Logger log, final AutoCloseable... closeables) {
+		for (AutoCloseable c : closeables) {
 			if (c != null) {
 				try {
 					c.close();
-				} catch (IOException e) {
+				} catch (Exception e) {
 					if (log != null && log.isDebugEnabled()) {
 						log.debug("Exception in closing " + c, e);
 					}
@@ -216,9 +215,48 @@ public final class IOUtils {
 	}
 
 	/**
+	 * Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted
+	 * after calling close() on every object.
+	 *
+	 * @param closeables iterable with closeables to close.
+	 * @throws Exception collected exceptions that occurred during closing
+	 */
+	public static void closeAll(Iterable<? extends AutoCloseable> closeables) throws Exception {
+		if (null != closeables) {
+
+			Exception collectedExceptions = null;
+
+			for (AutoCloseable closeable : closeables) {
+				try {
+					if (null != closeable) {
+						closeable.close();
+					}
+				} catch (Exception e) {
+					collectedExceptions = ExceptionUtils.firstOrSuppressed(collectedExceptions, e);
+				}
+			}
+
+			if (null != collectedExceptions) {
+				throw collectedExceptions;
+			}
+		}
+	}
+
+	/**
+	 * Closes all elements in the iterable with closeQuietly().
+	 */
+	public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
+		if (null != closeables) {
+			for (AutoCloseable closeable : closeables) {
+				closeQuietly(closeable);
+			}
+		}
+	}
+
+	/**
 	 * <p><b>Important:</b> This method is expected to never throw an exception.
 	 */
-	public static void closeQuietly(Closeable closeable) {
+	public static void closeQuietly(AutoCloseable closeable) {
 		try {
 			if (closeable != null) {
 				closeable.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6d075db..3fda430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -335,6 +335,12 @@ public class StateAssignmentOperation {
 			List<OperatorStateHandle>[] chainParallelOpStates, ChainedStateHandle<OperatorStateHandle> chainOpState) {
 
 		if (null != chainOpState) {
+
+			int chainLength = chainOpState.getLength();
+			Preconditions.checkState(chainLength >= chainParallelOpStates.length,
+					"Found more states than operators in the chain. Chain length: " + chainLength +
+							", States: " + chainParallelOpStates.length);
+
 			for (int chainIdx = 0; chainIdx < chainParallelOpStates.length; ++chainIdx) {
 				OperatorStateHandle operatorState = chainOpState.get(chainIdx);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index c560ab0..b4bf664 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,6 +53,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -1052,7 +1053,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.value();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
+			} catch (IOException e) {
 				if (!e.getMessage().contains("Trying to access state using wrong")) {
 					fail("wrong exception " + e);
 				}
@@ -1103,7 +1104,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
+			} catch (IOException e) {
 				if (!e.getMessage().contains("Trying to access state using wrong")) {
 					fail("wrong exception " + e);
 				}
@@ -1156,7 +1157,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
+			} catch (IOException e) {
 				if (!e.getMessage().contains("Trying to access state using wrong ")) {
 					fail("wrong exception " + e);
 				}