You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/25 16:55:53 UTC
[2/2] flink git commit: [FLINK-5626] Improved resource deallocation
in RocksDBKeyedStateBackend
[FLINK-5626] Improved resource deallocation in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd9115ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd9115ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd9115ff
Branch: refs/heads/master
Commit: cd9115ffd8f93c8fdcdb1645893db4b7684de589
Parents: a811545
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 19 18:08:02 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Jan 25 17:55:44 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 158 ++++++++++---------
.../streaming/state/RocksDBStateBackend.java | 43 ++---
.../state/RocksDBStateBackendConfigTest.java | 19 +--
.../flink/util/AbstractCloseableRegistry.java | 4 +-
.../java/org/apache/flink/util/IOUtils.java | 50 +++++-
.../checkpoint/StateAssignmentOperation.java | 6 +
.../runtime/state/StateBackendTestBase.java | 7 +-
7 files changed, 169 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index dccf3ac..61a796e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -56,9 +56,9 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
-
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@@ -68,11 +68,9 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -96,15 +94,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
- /** Operator identifier that is used to uniqueify the RocksDB storage path. */
- private final String operatorIdentifier;
-
- /** JobID for uniquifying backup paths. */
- private final JobID jobId;
-
- /** The options from the options factory, cached */
+ /** The column family options from the options factory */
private final ColumnFamilyOptions columnOptions;
+ /** The DB options from the options factory */
+ private final DBOptions dbOptions;
+
/** Path where this configured instance stores its data directory */
private final File instanceBasePath;
@@ -145,19 +140,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange
- ) throws Exception {
+ ) throws IOException {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
- this.operatorIdentifier = operatorIdentifier;
- this.jobId = jobId;
- this.columnOptions = columnFamilyOptions;
+ this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions);
+ this.dbOptions = Preconditions.checkNotNull(dbOptions);
- this.instanceBasePath = instanceBasePath;
+ this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
if (!instanceBasePath.exists()) {
if (!instanceBasePath.mkdirs()) {
- throw new RuntimeException("Could not create RocksDB data directory.");
+ throw new IOException("Could not create RocksDB data directory.");
}
}
@@ -168,7 +162,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FileUtils.deleteDirectory(instanceRocksDBPath);
}
} catch (IOException e) {
- throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+ throw new IOException("Error cleaning RocksDB data directory.", e);
}
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
@@ -176,9 +170,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
try {
- db = RocksDB.open(dbOptions, instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+
+ db = RocksDB.open(
+ Preconditions.checkNotNull(dbOptions),
+ instanceRocksDBPath.getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilyHandles);
+
} catch (RocksDBException e) {
- throw new RuntimeException("Error while opening RocksDB instance.", e);
+ throw new IOException("Error while opening RocksDB instance.", e);
}
keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
kvStateInformation = new HashMap<>();
@@ -200,21 +200,32 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
kvStateInformation.values()) {
-
- column.f0.close();
+ try {
+ column.f0.close();
+ } catch (Exception ex) {
+ LOG.info("Exception while closing ColumnFamilyHandle object.", ex);
+ }
}
kvStateInformation.clear();
- db.close();
+ try {
+ db.close();
+ } catch (Exception ex) {
+ LOG.info("Exception while closing RocksDB object.", ex);
+ }
+
db = null;
}
}
+ IOUtils.closeQuietly(columnOptions);
+ IOUtils.closeQuietly(dbOptions);
+
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (IOException ioex) {
- LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath);
+ LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex);
}
}
@@ -245,14 +256,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {
- if (kvStateInformation.isEmpty()) {
- LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
- " . Returning null.");
+ if (db != null) {
- return new DoneFuture<>(null);
- }
+ if (kvStateInformation.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
+ " . Returning null.");
+ }
+
+ return new DoneFuture<>(null);
+ }
- if (db != null) {
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
} else {
throw new IOException("RocksDB closed.");
@@ -328,9 +342,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Snapshot snapshot;
private ReadOptions readOptions;
+ private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+
private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
private DataOutputView outputView;
- private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
private KeyGroupsStateHandle snapshotResultStateHandle;
RocksDBSnapshotOperation(
@@ -401,26 +416,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* 5) Release the snapshot object for RocksDB and clean up.
- *
*/
public void releaseSnapshotResources(boolean canceled) {
+
if (null != kvStateIterators) {
for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
- kvStateIterator.f0.close();
+ IOUtils.closeQuietly(kvStateIterator.f0);
}
kvStateIterators = null;
}
if (null != snapshot) {
- if(null != stateBackend.db) {
+ if (null != stateBackend.db) {
stateBackend.db.releaseSnapshot(snapshot);
}
- snapshot.close();
+ IOUtils.closeQuietly(snapshot);
snapshot = null;
}
if (null != readOptions) {
- readOptions.close();
+ IOUtils.closeQuietly(readOptions);
readOptions = null;
}
@@ -477,8 +492,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
//retrieve iterator for this k/v states
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
- RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
- kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
+
+ kvStateIterators.add(
+ new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
+
++kvStateId;
}
@@ -493,12 +510,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
byte[] previousKey = null;
byte[] previousValue = null;
- List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators;
- this.kvStateIterators = null;
-
// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
- kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) {
+ kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
+
+ // handover complete, null out to prevent double close
+ kvStateIterators = null;
//preamble: setup with first key-group as our lookahead
if (mergeIterator.isValid()) {
@@ -584,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
private static void checkInterrupted() throws InterruptedException {
- if(Thread.currentThread().isInterrupted()) {
+ if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("RocksDB snapshot interrupted.");
}
}
@@ -674,7 +691,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
} finally {
if (currentStateHandleInStream != null) {
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
- currentStateHandleInStream.close();
+ IOUtils.closeQuietly(currentStateHandleInStream);
}
}
}
@@ -778,7 +795,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* that we checkpointed, i.e. is already in the map of column families.
*/
@SuppressWarnings("rawtypes, unchecked")
- protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
+ protected <N, S> ColumnFamilyHandle getColumnFamily(
+ StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
kvStateInformation.get(descriptor.getName());
@@ -790,12 +808,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
descriptor.getSerializer());
if (stateInfo != null) {
- if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
- throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
+ if (newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+ stateInfo.f1 = newMetaInfo;
+ return stateInfo.f0;
+ } else {
+ throw new IOException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
" trying access with " + newMetaInfo);
}
- stateInfo.f1 = newMetaInfo;
- return stateInfo.f0;
}
ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
@@ -809,7 +828,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
rawAccess.put(descriptor.getName(), tuple);
return columnFamily;
} catch (RocksDBException e) {
- throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
+ throw new IOException("Error creating ColumnFamilyHandle.", e);
}
}
@@ -866,21 +885,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
* Used by #MergeIterator.
*/
- static final class MergeIterator {
+ static final class MergeIterator implements AutoCloseable {
/**
- *
- * @param iterator The #RocksIterator to wrap .
+ * @param iterator The #RocksIterator to wrap .
* @param kvStateId Id of the K/V state to which this iterator belongs.
*/
- public MergeIterator(RocksIterator iterator, int kvStateId) {
+ MergeIterator(RocksIterator iterator, int kvStateId) {
this.iterator = Preconditions.checkNotNull(iterator);
this.currentKey = iterator.key();
this.kvStateId = kvStateId;
}
- private byte[] currentKey;
private final RocksIterator iterator;
+ private byte[] currentKey;
private final int kvStateId;
public byte[] getCurrentKey() {
@@ -899,8 +917,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return kvStateId;
}
+ @Override
public void close() {
- this.iterator.close();
+ IOUtils.closeQuietly(iterator);
}
}
@@ -908,7 +927,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
* The resulting iteration sequence is ordered by (key-group, kv-state).
*/
- static final class RocksDBMergeIterator implements Closeable {
+ static final class RocksDBMergeIterator implements AutoCloseable {
private final PriorityQueue<MergeIterator> heap;
private final int keyGroupPrefixByteCount;
@@ -943,20 +962,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
if (kvStateIterators.size() > 0) {
- this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+ PriorityQueue<MergeIterator> iteratorPriorityQueue =
+ new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
- RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+ final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
- heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
} else {
- rocksIterator.close();
+ IOUtils.closeQuietly(rocksIterator);
}
}
kvStateIterators.clear();
+ this.heap = iteratorPriorityQueue;
this.valid = !heap.isEmpty();
this.currentSubIterator = heap.poll();
} else {
@@ -991,7 +1012,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
detectNewKeyGroup(oldKey);
}
} else {
- rocksIterator.close();
+ IOUtils.closeQuietly(rocksIterator);
if (heap.isEmpty()) {
currentSubIterator = null;
@@ -1082,16 +1103,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public void close() {
+ IOUtils.closeQuietly(currentSubIterator);
+ currentSubIterator = null;
- if (null != currentSubIterator) {
- currentSubIterator.close();
- currentSubIterator = null;
- }
-
- for (MergeIterator iterator : heap) {
- iterator.close();
- }
-
+ IOUtils.closeAllQuietly(heap);
heap.clear();
}
}
@@ -1148,7 +1163,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// the EOFException will get us out of this...
while (true) {
byte mappingByte = inputView.readByte();
- ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
+ ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte),null);
+
byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1e5620f..3bfe742 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -63,6 +63,7 @@ import static java.util.Objects.requireNonNull;
* {@link #setOptions(OptionsFactory)}.
*/
public class RocksDBStateBackend extends AbstractStateBackend {
+
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
@@ -104,10 +105,6 @@ public class RocksDBStateBackend extends AbstractStateBackend {
/** The options factory to create the RocksDB options in the cluster */
private OptionsFactory optionsFactory;
- /** The options from the options factory, cached */
- private transient DBOptions dbOptions;
- private transient ColumnFamilyOptions columnOptions;
-
/** Whether we already lazily initialized our local storage directories. */
private transient boolean isInitialized = false;
@@ -392,39 +389,33 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances.
*/
public DBOptions getDbOptions() {
- if (dbOptions == null) {
- // initial options from pre-defined profile
- DBOptions opt = predefinedOptions.createDBOptions();
+ // initial options from pre-defined profile
+ DBOptions opt = predefinedOptions.createDBOptions();
- // add user-defined options, if specified
- if (optionsFactory != null) {
- opt = optionsFactory.createDBOptions(opt);
- }
+ // add user-defined options, if specified
+ if (optionsFactory != null) {
+ opt = optionsFactory.createDBOptions(opt);
+ }
- // add necessary default options
- opt = opt.setCreateIfMissing(true);
+ // add necessary default options
+ opt = opt.setCreateIfMissing(true);
- dbOptions = opt;
- }
- return dbOptions;
+ return opt;
}
/**
* Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances.
*/
public ColumnFamilyOptions getColumnOptions() {
- if (columnOptions == null) {
- // initial options from pre-defined profile
- ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
-
- // add user-defined options, if specified
- if (optionsFactory != null) {
- opt = optionsFactory.createColumnOptions(opt);
- }
+ // initial options from pre-defined profile
+ ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
- columnOptions = opt;
+ // add user-defined options, if specified
+ if (optionsFactory != null) {
+ opt = optionsFactory.createColumnOptions(opt);
}
- return columnOptions;
+
+ return opt;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 9524352..463dd44 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -261,17 +261,18 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
- DBOptions opt1 = rocksDbBackend.getDbOptions();
- DBOptions opt2 = rocksDbBackend.getDbOptions();
+ try (
+ DBOptions optCreated = rocksDbBackend.getDbOptions();
+ DBOptions optReference = new DBOptions();
+ ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
- assertEquals(opt1, opt2);
+ // check that our instance uses something that we configured
+ assertEquals(true, optCreated.disableDataSync());
+ // just ensure that we pickend an option that actually differs from the reference.
+ assertEquals(false, optReference.disableDataSync());
- ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
- ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
-
- assertEquals(columnOpt1, columnOpt2);
-
- assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
+ assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle());
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 7c0291c..85af982 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -88,9 +88,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
public void close() throws IOException {
synchronized (getSynchronizationLock()) {
- for (Closeable closeable : closeableToRef.keySet()) {
- IOUtils.closeQuietly(closeable);
- }
+ IOUtils.closeAllQuietly(closeableToRef.keySet());
closeableToRef.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 11c06a8..0bdc13a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.util;
import org.slf4j.Logger;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -166,7 +165,7 @@ public final class IOUtils {
// ------------------------------------------------------------------------
/**
- * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+ * Close the AutoCloseable objects and <b>ignore</b> any {@link Exception} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log
@@ -174,12 +173,12 @@ public final class IOUtils {
* @param closeables
* the objects to close
*/
- public static void cleanup(final Logger log, final java.io.Closeable... closeables) {
- for (java.io.Closeable c : closeables) {
+ public static void cleanup(final Logger log, final AutoCloseable... closeables) {
+ for (AutoCloseable c : closeables) {
if (c != null) {
try {
c.close();
- } catch (IOException e) {
+ } catch (Exception e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
@@ -216,9 +215,48 @@ public final class IOUtils {
}
/**
+ * Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted
+ * after calling close() on every object.
+ *
+ * @param closeables iterable with closeables to close.
+ * @throws Exception collected exceptions that occurred during closing
+ */
+ public static void closeAll(Iterable<? extends AutoCloseable> closeables) throws Exception {
+ if (null != closeables) {
+
+ Exception collectedExceptions = null;
+
+ for (AutoCloseable closeable : closeables) {
+ try {
+ if (null != closeable) {
+ closeable.close();
+ }
+ } catch (Exception e) {
+ collectedExceptions = ExceptionUtils.firstOrSuppressed(collectedExceptions, e);
+ }
+ }
+
+ if (null != collectedExceptions) {
+ throw collectedExceptions;
+ }
+ }
+ }
+
+ /**
+ * Closes all elements in the iterable with closeQuietly().
+ */
+ public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
+ if (null != closeables) {
+ for (AutoCloseable closeable : closeables) {
+ closeQuietly(closeable);
+ }
+ }
+ }
+
+ /**
* <p><b>Important:</b> This method is expected to never throw an exception.
*/
- public static void closeQuietly(Closeable closeable) {
+ public static void closeQuietly(AutoCloseable closeable) {
try {
if (closeable != null) {
closeable.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6d075db..3fda430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -335,6 +335,12 @@ public class StateAssignmentOperation {
List<OperatorStateHandle>[] chainParallelOpStates, ChainedStateHandle<OperatorStateHandle> chainOpState) {
if (null != chainOpState) {
+
+ int chainLength = chainOpState.getLength();
+ Preconditions.checkState(chainLength >= chainParallelOpStates.length,
+ "Found more states than operators in the chain. Chain length: " + chainLength +
+ ", States: " + chainParallelOpStates.length);
+
for (int chainIdx = 0; chainIdx < chainParallelOpStates.length; ++chainIdx) {
OperatorStateHandle operatorState = chainOpState.get(chainIdx);
http://git-wip-us.apache.org/repos/asf/flink/blob/cd9115ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index c560ab0..b4bf664 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,6 +53,7 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -1052,7 +1053,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.value();
fail("should recognize wrong serializers");
- } catch (RuntimeException e) {
+ } catch (IOException e) {
if (!e.getMessage().contains("Trying to access state using wrong")) {
fail("wrong exception " + e);
}
@@ -1103,7 +1104,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.get();
fail("should recognize wrong serializers");
- } catch (RuntimeException e) {
+ } catch (IOException e) {
if (!e.getMessage().contains("Trying to access state using wrong")) {
fail("wrong exception " + e);
}
@@ -1156,7 +1157,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.get();
fail("should recognize wrong serializers");
- } catch (RuntimeException e) {
+ } catch (IOException e) {
if (!e.getMessage().contains("Trying to access state using wrong ")) {
fail("wrong exception " + e);
}