You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:36 UTC
[18/27] flink git commit: [FLINK-4381] Refactor State to Prepare For
Key-Group State Backends
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
new file mode 100644
index 0000000..9b308a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Handle to the non-partitioned states for the operators in an operator chain.
+ */
+public class ChainedStateHandle<T extends StateObject> implements StateObject {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The state handles for all operators in the chain */
+ private final List<? extends T> operatorStateHandles;
+
+ /**
+ * Wraps a list to the state handles for the operators in a chain. Individual state handles can be null.
+ *
+ * @param operatorStateHandles list with the state handles for the states in the operator chain.
+ */
+ public ChainedStateHandle(List<? extends T> operatorStateHandles) {
+ this.operatorStateHandles = Preconditions.checkNotNull(operatorStateHandles);
+ }
+
+ /**
+ * Check if there are any states handles present. Notice that this can be true even if {@link #getLength()} is
+ * greater than zero, because state handles can be null.
+ *
+ * @return true if there are no state handles for any operator.
+ */
+ public boolean isEmpty() {
+ for (T state : operatorStateHandles) {
+ if (state != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the length of the operator chain. This can be different from the number of operator state handles,
+ * because the some operators in the chain can have no state and thus their state handle can be null.
+ *
+ * @return length of the operator chain
+ */
+ public int getLength() {
+ return operatorStateHandles.size();
+ }
+
+ /**
+ * Get the state handle for a single operator in the operator chain by it's index.
+ *
+ * @param index the index in the operator chain
+ * @return state handle to the operator at the given position in the operator chain. can be null.
+ */
+ public T get(int index) {
+ return operatorStateHandles.get(index);
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ StateUtil.bestEffortDiscardAllStateObjects(operatorStateHandles);
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ long sumStateSize = 0;
+
+ if (operatorStateHandles != null) {
+ for (T state : operatorStateHandles) {
+ if (state != null) {
+ sumStateSize += state.getStateSize();
+ }
+ }
+ }
+
+ // State size as sum of all state sizes
+ return sumStateSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ChainedStateHandle<?> that = (ChainedStateHandle<?>) o;
+
+ return operatorStateHandles.equals(that.operatorStateHandles);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return operatorStateHandles.hashCode();
+ }
+
+ public static <T extends StateObject> ChainedStateHandle<T> wrapSingleHandle(T stateHandleToWrap) {
+ return new ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap));
+ }
+
+ @Override
+ public void close() throws IOException {
+ StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
index 280746d..9ee4b90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
@@ -56,11 +56,11 @@ public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
}
@Override
- public void setup(int numberKeyGroups) {
- Preconditions.checkArgument(numberKeyGroups > 0, "The number of key groups has to be " +
+ public void setup(int numberOfKeygroups) {
+ Preconditions.checkArgument(numberOfKeygroups > 0, "The number of key groups has to be " +
"greater than 0. Use setMaxParallelism() to specify the number of key " +
"groups.");
- this.numberKeyGroups = numberKeyGroups;
+ this.numberKeyGroups = numberOfKeygroups;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
new file mode 100644
index 0000000..de42bdb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * This class defines a range of key-group indexes. Key-groups are the granularity into which the keyspace of a job
+ * is partitioned for keyed state-handling in state backends. The boundaries of the range are inclusive.
+ */
+public class KeyGroupRange implements Iterable<Integer>, Serializable {
+
+ /** The empty key-group */
+ public static final KeyGroupRange EMPTY_KEY_GROUP = new KeyGroupRange();
+
+ private final int startKeyGroup;
+ private final int endKeyGroup;
+
+ /**
+ * Empty KeyGroup Constructor
+ */
+ private KeyGroupRange() {
+ this.startKeyGroup = 0;
+ this.endKeyGroup = -1;
+ }
+
+ /**
+ * Defines the range [startKeyGroup, endKeyGroup]
+ *
+ * @param startKeyGroup start of the range (inclusive)
+ * @param endKeyGroup end of the range (inclusive)
+ */
+ public KeyGroupRange(int startKeyGroup, int endKeyGroup) {
+ Preconditions.checkArgument(startKeyGroup >= 0);
+ Preconditions.checkArgument(startKeyGroup <= endKeyGroup);
+ this.startKeyGroup = startKeyGroup;
+ this.endKeyGroup = endKeyGroup;
+ Preconditions.checkArgument(getNumberOfKeyGroups() >= 0, "Potential overflow detected.");
+ }
+
+
+ /**
+ * Checks whether or not a single key-group is contained in the range.
+ *
+ * @param keyGroup Key-group to check for inclusion.
+ * @return True, only if the key-group is in the range.
+ */
+ public boolean contains(int keyGroup) {
+ return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;
+ }
+
+ /**
+ * Create a range that represent the intersection between this range and the given range.
+ *
+ * @param other A KeyGroupRange to intersect.
+ * @return Key-group range that is the intersection between this and the given key-group range.
+ */
+ public KeyGroupRange getIntersection(KeyGroupRange other) {
+ int start = Math.max(startKeyGroup, other.startKeyGroup);
+ int end = Math.min(endKeyGroup, other.endKeyGroup);
+ return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP;
+ }
+
+ /**
+ *
+ * @return The number of key-groups in the range
+ */
+ public int getNumberOfKeyGroups() {
+ return 1 + endKeyGroup - startKeyGroup;
+ }
+
+ /**
+ *
+ * @return The first key-group in the range.
+ */
+ public int getStartKeyGroup() {
+ return startKeyGroup;
+ }
+
+ /**
+ *
+ * @return The last key-group in the range.
+ */
+ public int getEndKeyGroup() {
+ return endKeyGroup;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KeyGroupRange)) {
+ return false;
+ }
+
+ KeyGroupRange that = (KeyGroupRange) o;
+ return startKeyGroup == that.startKeyGroup && endKeyGroup == that.endKeyGroup;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = startKeyGroup;
+ result = 31 * result + endKeyGroup;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "KeyGroupRange{" +
+ "startKeyGroup=" + startKeyGroup +
+ ", endKeyGroup=" + endKeyGroup +
+ '}';
+ }
+
+ @Override
+ public Iterator<Integer> iterator() {
+ return new KeyGroupIterator();
+ }
+
+ private final class KeyGroupIterator implements Iterator<Integer> {
+
+ public KeyGroupIterator() {
+ this.iteratorPos = 0;
+ }
+
+ private int iteratorPos;
+
+ @Override
+ public boolean hasNext() {
+ return iteratorPos < getNumberOfKeyGroups();
+ }
+
+ @Override
+ public Integer next() {
+ int rv = startKeyGroup + iteratorPos;
+ ++iteratorPos;
+ return rv;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Unsupported by this iterator!");
+ }
+ }
+
+ /**
+ * Factory method that also handles creation of empty key-groups.
+ *
+ * @param startKeyGroup start of the range (inclusive)
+ * @param endKeyGroup end of the range (inclusive)
+ * @return the key-group from start to end or an empty key-group range.
+ */
+ public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {
+ return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP;
+ }
+
+ /**
+ * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
+ * parallelism.
+ *
+ * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+ * to go beyond this boundary, this method must perform arithmetic on long values.
+ *
+ * @param maxParallelism Maximal parallelism that the job was initially created with.
+ * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
+ * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
+ * @return
+ */
+ public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+ int maxParallelism,
+ int parallelism,
+ int operatorIndex) {
+
+ int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
+ int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
+ return new KeyGroupRange(start, end);
+ }
+
+ /**
+ * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
+ * parallelism.
+ *
+ * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+ * to go beyond this boundary, this method must perform arithmetic on long values.
+ *
+ * @param maxParallelism Maximal parallelism that the job was initially created with.
+ * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
+ * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
+ * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism.
+ * @return The index of the operator to which elements from the given key-group should be routed under the given
+ * parallelism and maxParallelism.
+ */
+ public static final int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
+ return keyGroupId * parallelism / maxParallelism;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
new file mode 100644
index 0000000..4f0a82b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+
+
+/**
+ * This class combines a key-group range with offsets that correspond to the key-groups in the range.
+ */
+public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , Serializable {
+
+ /** the range of key-groups */
+ private final KeyGroupRange keyGroupRange;
+
+ /** the aligned array of offsets for the key-groups */
+ private final long[] offsets;
+
+ /**
+ * Creates key-group range with offsets from the given key-group range. The order of given offsets must be aligned
+ * with respect to the key-groups in the range.
+ *
+ * @param keyGroupRange The range of key-groups.
+ * @param offsets The aligned array of offsets for the given key-groups.
+ */
+ public KeyGroupRangeOffsets(KeyGroupRange keyGroupRange, long[] offsets) {
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+ this.offsets = Preconditions.checkNotNull(offsets);
+ Preconditions.checkArgument(offsets.length == keyGroupRange.getNumberOfKeyGroups());
+ }
+
+ /**
+ * Creates key-group range with offsets from the given start key-group to end key-group. The order of given offsets
+ * must be aligned with respect to the key-groups in the range.
+ *
+ * @param rangeStart Start key-group of the range (inclusive)
+ * @param rangeEnd End key-group of the range (inclusive)
+ * @param offsets The aligned array of offsets for the given key-groups.
+ */
+ public KeyGroupRangeOffsets(int rangeStart, int rangeEnd, long[] offsets) {
+ this(KeyGroupRange.of(rangeStart, rangeEnd), offsets);
+ }
+
+ /**
+ * Creates key-group range with offsets from the given start key-group to end key-group.
+ * All offsets are initially zero.
+ *
+ * @param rangeStart Start key-group of the range (inclusive)
+ * @param rangeEnd End key-group of the range (inclusive)
+ */
+ public KeyGroupRangeOffsets(int rangeStart, int rangeEnd) {
+ this(KeyGroupRange.of(rangeStart, rangeEnd));
+ }
+
+ /**
+ * Creates key-group range with offsets for the given key-group range, where all offsets are initially zero.
+ *
+ * @param keyGroupRange The range of key-groups.
+ */
+ public KeyGroupRangeOffsets(KeyGroupRange keyGroupRange) {
+ this(keyGroupRange, new long[keyGroupRange.getNumberOfKeyGroups()]);
+ }
+
+ /**
+ * Returns the offset for the given key-group. The key-group must be contained in the range.
+ *
+ * @param keyGroup Key-group for which we query the offset. Key-group must be contained in the range.
+ * @return The offset for the given key-group which must be contained in the range.
+ */
+ public long getKeyGroupOffset(int keyGroup) {
+ return offsets[computeKeyGroupIndex(keyGroup)];
+ }
+
+ /**
+ * Sets the offset for the given key-group. The key-group must be contained in the range.
+ *
+ * @param keyGroup Key-group for which we set the offset. Must be contained in the range.
+ * @param offset Offset for the key-group.
+ */
+ public void setKeyGroupOffset(int keyGroup, long offset) {
+ offsets[computeKeyGroupIndex(keyGroup)] = offset;
+ }
+
+ /**
+ * Returns a key-group range with offsets which is the intersection of the internal key-group range with the given
+ * key-group range.
+ *
+ * @param keyGroupRange Key-group range to intersect with the internal key-group range.
+ * @return The key-group range with offsets for the intersection of the internal key-group range with the given
+ * key-group range.
+ */
+ public KeyGroupRangeOffsets getIntersection(KeyGroupRange keyGroupRange) {
+ Preconditions.checkNotNull(keyGroupRange);
+ KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
+ long[] subOffsets = new long[intersection.getNumberOfKeyGroups()];
+ if(subOffsets.length > 0) {
+ System.arraycopy(
+ offsets,
+ computeKeyGroupIndex(intersection.getStartKeyGroup()),
+ subOffsets,
+ 0,
+ subOffsets.length);
+ }
+ return new KeyGroupRangeOffsets(intersection, subOffsets);
+ }
+
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ @Override
+ public Iterator<Tuple2<Integer, Long>> iterator() {
+ return new KeyGroupOffsetsIterator();
+ }
+
+ private int computeKeyGroupIndex(int keyGroup) {
+ return keyGroup - keyGroupRange.getStartKeyGroup();
+ }
+
+ /**
+ * Iterator for the Key-group/Offset pairs.
+ */
+ private final class KeyGroupOffsetsIterator implements Iterator<Tuple2<Integer, Long>> {
+
+ public KeyGroupOffsetsIterator() {
+ this.keyGroupIterator = keyGroupRange.iterator();
+ }
+
+ private final Iterator<Integer> keyGroupIterator;
+
+ @Override
+ public boolean hasNext() {
+ return keyGroupIterator.hasNext();
+ }
+
+ @Override
+ public Tuple2<Integer, Long> next() {
+ Integer currentKeyGroup = keyGroupIterator.next();
+ Tuple2<Integer,Long> result = new Tuple2<>(
+ currentKeyGroup,
+ offsets[currentKeyGroup - keyGroupRange.getStartKeyGroup()]);
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Unsupported by this iterator!");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KeyGroupRangeOffsets)) {
+ return false;
+ }
+
+ KeyGroupRangeOffsets that = (KeyGroupRangeOffsets) o;
+
+ if (keyGroupRange != null ? !keyGroupRange.equals(that.keyGroupRange) : that.keyGroupRange != null) {
+ return false;
+ }
+ return Arrays.equals(offsets, that.offsets);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = keyGroupRange != null ? keyGroupRange.hashCode() : 0;
+ result = 31 * result + Arrays.hashCode(offsets);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "KeyGroupRangeOffsets{" +
+ "keyGroupRange=" + keyGroupRange +
+ ", offsets=" + Arrays.toString(offsets) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
new file mode 100644
index 0000000..0a36f92
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A handle to the partitioned stream operator state after it has been checkpointed. This state
+ * consists of a range of key group snapshots. A key group is subset of the available
+ * key space. The key groups are identified by their key group indices.
+ */
+public class KeyGroupsStateHandle implements StateObject {
+
+ private static final long serialVersionUID = -8070326169926626355L;
+
+ /** Range of key-groups with their respective offsets in the stream state */
+ private final KeyGroupRangeOffsets groupRangeOffsets;
+
+ /** Inner stream handle to the actual states of the key-groups in the range */
+ private final StreamStateHandle stateHandle;
+
+ /**
+ *
+ * @param groupRangeOffsets range of key-group ids that in the state of this handle
+ * @param streamStateHandle handle to the actual state of the key-groups
+ */
+ public KeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+ Preconditions.checkNotNull(groupRangeOffsets);
+ Preconditions.checkNotNull(streamStateHandle);
+
+ this.groupRangeOffsets = groupRangeOffsets;
+ this.stateHandle = streamStateHandle;
+ }
+
+ /**
+ *
+ * @return iterable over the key-group range for the key-group state referenced by this handle
+ */
+ public Iterable<Integer> keyGroups() {
+ return groupRangeOffsets.getKeyGroupRange();
+ }
+
+
+ /**
+ *
+ * @param keyGroupId the id of a key-group
+ * @return true if the provided key-group id is contained in the key-group range of this handle
+ */
+ public boolean containsKeyGroup(int keyGroupId) {
+ return groupRangeOffsets.getKeyGroupRange().contains(keyGroupId);
+ }
+
+ /**
+ *
+ * @param keyGroupId the id of a key-group. the id must be contained in the range of this handle.
+ * @return offset to the position of data for the provided key-group in the stream referenced by this state handle
+ */
+ public long getOffsetForKeyGroup(int keyGroupId) {
+ return groupRangeOffsets.getKeyGroupOffset(keyGroupId);
+ }
+
+ /**
+ *
+ * @param keyGroupRange a key group range to intersect.
+ * @return key-group state over a range that is the intersection between this handle's key-group range and the
+ * provided key-group range.
+ */
+ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) {
+ return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), stateHandle);
+ }
+
+ /**
+ *
+ * @return the internal key-group range to offsets metadata
+ */
+ public KeyGroupRangeOffsets getGroupRangeOffsets() {
+ return groupRangeOffsets;
+ }
+
+ /**
+ *
+ * @return number of key-groups in the key-group range of this handle
+ */
+ public int getNumberOfKeyGroups() {
+ return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
+ }
+
+ /**
+ *
+ * @return the inner stream state handle to the actual key-group states
+ */
+ public StreamStateHandle getStateHandle() {
+ return stateHandle;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ stateHandle.discardState();
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ return stateHandle.getStateSize();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof KeyGroupsStateHandle)) {
+ return false;
+ }
+
+ KeyGroupsStateHandle that = (KeyGroupsStateHandle) o;
+
+ if (!groupRangeOffsets.equals(that.groupRangeOffsets)) {
+ return false;
+ }
+ return stateHandle.equals(that.stateHandle);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = groupRangeOffsets.hashCode();
+ result = 31 * result + stateHandle.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "KeyGroupsStateHandle{" +
+ "groupRangeOffsets=" + groupRangeOffsets +
+ ", data=" + stateHandle +
+ '}';
+ }
+
+ @Override
+ public void close() throws IOException {
+ stateHandle.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
deleted file mode 100644
index 4e7531f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A StateHandle that includes the operator states directly.
- */
-public class LocalStateHandle<T extends Serializable> implements StateHandle<T> {
-
- private static final long serialVersionUID = 2093619217898039610L;
-
- private final T state;
-
- public LocalStateHandle(T state) {
- this.state = state;
- }
-
- @Override
- public T getState(ClassLoader userCodeClassLoader) {
- // The object has been deserialized correctly before
- return state;
- }
-
- @Override
- public void discardState() {}
-
- @Override
- public long getStateSize() {
- return 0;
- }
-
- @Override
- public void close() throws IOException {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
new file mode 100644
index 0000000..d547624
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Handle to state that can be read back again via {@link #retrieveState()}.
+ */
+public interface RetrievableStateHandle<T extends Serializable> extends StateObject {
+
+ /**
+ * Retrieves the object that was previously written to state.
+ */
+ T retrieveState() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
new file mode 100644
index 0000000..e3538af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper around a {@link StreamStateHandle} to make the referenced state object retrievable trough a simple get call.
+ * This implementation expects that the object was serialized through default serialization of Java's
+ * {@link java.io.ObjectOutputStream}.
+ *
+ * @param <T> type of the retrievable object which is stored under the wrapped stream handle
+ */
+public class RetrievableStreamStateHandle<T extends Serializable> implements
+ StreamStateHandle, RetrievableStateHandle<T>, Closeable {
+
+ private static final long serialVersionUID = 314567453677355L;
+ /** wrapped inner stream state handle from which we deserialize on retrieval */
+ private final StreamStateHandle wrappedStreamStateHandle;
+
+ public RetrievableStreamStateHandle(StreamStateHandle streamStateHandle) {
+ this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle);
+ }
+
+ public RetrievableStreamStateHandle(Path filePath) {
+ Preconditions.checkNotNull(filePath);
+ this.wrappedStreamStateHandle = new FileStateHandle(filePath);
+ }
+
+ @Override
+ public T retrieveState() throws Exception {
+ try (FSDataInputStream in = openInputStream()) {
+ return InstantiationUtil.deserializeObject(in);
+ }
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() throws Exception {
+ return wrappedStreamStateHandle.openInputStream();
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ wrappedStreamStateHandle.discardState();
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ return wrappedStreamStateHandle.getStateSize();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrappedStreamStateHandle.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index f17eb6e..39e7ed2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -20,13 +20,15 @@ package org.apache.flink.runtime.state;
import org.apache.flink.configuration.Configuration;
+import java.io.Serializable;
+
/**
* A factory to create a specific state backend. The state backend creation gets a Configuration
* object that can be used to read further config values.
*
* @param <T> The type of the state backend created.
*/
-public interface StateBackendFactory<T extends AbstractStateBackend> {
+public interface StateBackendFactory<T extends AbstractStateBackend> extends Serializable {
/**
* Creates the state backend, optionally using the given configuration.
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
new file mode 100644
index 0000000..3c5157e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+
+/**
+ * Helpers for {@link StateObject} related code.
+ */
+public class StateUtil {
+
+ private StateUtil() {
+ throw new AssertionError();
+ }
+
+ /**
+ * Iterates through the passed state handles and calls discardState() on each handle that is not null. All
+ * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
+ *
+ * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
+ * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+ */
+ public static void bestEffortDiscardAllStateObjects(
+ Iterable<? extends StateObject> handlesToDiscard) throws Exception {
+
+ if (handlesToDiscard != null) {
+
+ Exception suppressedExceptions = null;
+
+ for (StateObject state : handlesToDiscard) {
+
+ if (state != null) {
+ try {
+ state.discardState();
+ } catch (Exception ex) {
+ //best effort to still cleanup other states and deliver exceptions in the end
+ if (suppressedExceptions == null) {
+ suppressedExceptions = new Exception(ex);
+ }
+ suppressedExceptions.addSuppressed(ex);
+ }
+ }
+ }
+
+ if (suppressedExceptions != null) {
+ throw suppressedExceptions;
+ }
+ }
+ }
+
+ /**
+ * Iterates through the passed state handles and calls discardState() on each handle that is not null. All
+ * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
+ *
+ * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
+ * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+ */
+ public static void bestEffortCloseAllStateObjects(
+ Iterable<? extends StateObject> handlesToDiscard) throws IOException {
+
+ if (handlesToDiscard != null) {
+
+ IOException suppressedExceptions = null;
+
+ for (StateObject state : handlesToDiscard) {
+
+ if (state != null) {
+ try {
+ state.close();
+ } catch (Exception ex) {
+ //best effort to still cleanup other states and deliver exceptions in the end
+ if (suppressedExceptions == null) {
+ suppressedExceptions = new IOException(ex);
+ }
+ suppressedExceptions.addSuppressed(ex);
+ }
+ }
+ }
+
+ if (suppressedExceptions != null) {
+ throw suppressedExceptions;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
deleted file mode 100644
index b130c70..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-
-/**
- * A collection of utility methods for dealing with operator state.
- */
-public class StateUtils {
-
- /**
- * Utility method to define a common generic bound to be used for setting a
- * generic state handle on a generic state carrier.
- *
- * This has no impact on runtime, since internally, it performs unchecked
- * casts. The purpose is merely to allow the use of generic interfaces
- * without resorting to raw types, by giving the compiler a common type
- * bound.
- *
- * @param op
- * The state carrier operator.
- * @param state
- * The state handle.
- * @param <T>
- * Type bound for the
- */
- public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, StateHandle<?> state)
- throws Exception {
-
- @SuppressWarnings("unchecked")
- StatefulTask<T> typedOp = (StatefulTask<T>) op;
- @SuppressWarnings("unchecked")
- T typedHandle = (T) state;
-
- typedOp.setInitialState(typedHandle);
- }
-
- // ------------------------------------------------------------------------
-
- /** Do not instantiate */
- private StateUtils() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index 891243b..46e4299 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,17 @@
package org.apache.flink.runtime.state;
-import java.io.InputStream;
-import java.io.Serializable;
+import org.apache.flink.core.fs.FSDataInputStream;
/**
- * A state handle that produces an input stream when resolved.
+ * A {@link StateObject} that represents state that was written to a stream. The data can be read
+ * back via {@link #openInputStream()}.
*/
-public interface StreamStateHandle extends StateHandle<InputStream> {
+public interface StreamStateHandle extends StateObject {
/**
- * Converts this stream state handle into a state handle that de-serializes
- * the stream into an object using Java's serialization mechanism.
- *
- * @return The state handle that automatically de-serializes.
+ * Returns an {@link FSDataInputStream} that can be used to read back the data that
+ * was previously written to the stream.
*/
- <T extends Serializable> StateHandle<T> toSerializableHandle();
+ FSDataInputStream openInputStream() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
deleted file mode 100644
index 0585062..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateObject;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject {
-
- private static final long serialVersionUID = 350284443258002355L;
-
- /** The path to the file in the filesystem, fully describing the file system */
- private final Path filePath;
-
- /** Cached file system handle */
- private transient FileSystem fs;
-
- /**
- * Creates a new file state for the given file path.
- *
- * @param filePath The path to the file that stores the state.
- */
- protected AbstractFileStateHandle(Path filePath) {
- this.filePath = checkNotNull(filePath);
- }
-
- /**
- * Gets the path where this handle's state is stored.
- * @return The path where this handle's state is stored.
- */
- public Path getFilePath() {
- return filePath;
- }
-
- /**
- * Discard the state by deleting the file that stores the state. If the parent directory
- * of the state is empty after deleting the state file, it is also deleted.
- *
- * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
- */
- @Override
- public void discardState() throws Exception {
- getFileSystem().delete(filePath, false);
-
- // send a call to delete the checkpoint directory containing the file. This will
- // fail (and be ignored) when some files still exist
- try {
- getFileSystem().delete(filePath.getParent(), false);
- } catch (IOException ignored) {}
- }
-
- /**
- * Gets the file system that stores the file state.
- * @return The file system that stores the file state.
- * @throws IOException Thrown if the file system cannot be accessed.
- */
- protected FileSystem getFileSystem() throws IOException {
- if (fs == null) {
- fs = FileSystem.get(filePath.toUri());
- }
- return fs;
- }
-
- /**
- * Returns the file size in bytes.
- *
- * @return The file size in bytes.
- * @throws IOException Thrown if the file system cannot be accessed.
- */
- protected long getFileSize() throws IOException {
- return getFileSystem().getFileStatus(filePath).getLen();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 0692541..51e8b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -38,8 +38,9 @@ import java.util.Map;
* @param <N> The type of the namespace in the snapshot state.
* @param <SV> The type of the state value.
*/
-public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
+ extends FileStateHandle
+ implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
private static final long serialVersionUID = 1L;
@@ -132,7 +133,7 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
* @throws IOException Thrown if the file system cannot be accessed.
*/
@Override
- public long getStateSize() throws IOException {
- return getFileSize();
+ public void discardState() throws Exception {
+ super.discardState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
deleted file mode 100644
index 34a1cb0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-
-/**
- * A state handle that points to state stored in a file via Java Serialization.
- *
- * @param <T> The type of state pointed to by the state handle.
- */
-public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileStateHandle implements StateHandle<T> {
-
- private static final long serialVersionUID = -657631394290213622L;
-
- /**
- * Creates a new FileSerializableStateHandle pointing to state at the given file path.
- *
- * @param filePath The path to the file containing the checkpointed state.
- */
- public FileSerializableStateHandle(Path filePath) {
- super(filePath);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public T getState(ClassLoader classLoader) throws Exception {
- ensureNotClosed();
-
- try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
- // make sure any deserialization can be aborted
- registerCloseable(inStream);
-
- ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
- return (T) ois.readObject();
- }
- }
-
- /**
- * Returns the file size in bytes.
- *
- * @return The file size in bytes.
- * @throws IOException Thrown if the file system cannot be accessed.
- */
- @Override
- public long getStateSize() throws IOException {
- return getFileSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
new file mode 100644
index 0000000..871e56c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StreamStateHandle} for state that was written to a file stream. The written data is
+ * identifier by the file path. The state can be read again by calling {@link #openInputStream()}.
+ */
+public class FileStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+
+ private static final long serialVersionUID = 350284443258002355L;
+
+ /**
+ * The path to the file in the filesystem, fully describing the file system
+ */
+ private final Path filePath;
+
+ /**
+ * Cached file system handle
+ */
+ private transient FileSystem fs;
+
+ /**
+ * Creates a new file state for the given file path.
+ *
+ * @param filePath The path to the file that stores the state.
+ */
+ public FileStateHandle(Path filePath) {
+ this.filePath = requireNonNull(filePath);
+ }
+
+ /**
+ * Gets the path where this handle's state is stored.
+ *
+ * @return The path where this handle's state is stored.
+ */
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() throws Exception {
+ ensureNotClosed();
+ FSDataInputStream inputStream = getFileSystem().open(filePath);
+ registerCloseable(inputStream);
+ return inputStream;
+ }
+
+ /**
+ * Discard the state by deleting the file that stores the state. If the parent directory
+ * of the state is empty after deleting the state file, it is also deleted.
+ *
+ * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+ */
+ @Override
+ public void discardState() throws Exception {
+ getFileSystem().delete(filePath, false);
+
+ // send a call to delete the checkpoint directory containing the file. This will
+ // fail (and be ignored) when some files still exist
+ try {
+ getFileSystem().delete(filePath.getParent(), false);
+ } catch (IOException ignored) {
+ }
+ }
+
+ /**
+ * Returns the file size in bytes.
+ *
+ * @return The file size in bytes.
+ * @throws IOException Thrown if the file system cannot be accessed.
+ */
+ @Override
+ public long getStateSize() throws IOException {
+ return getFileSystem().getFileStatus(filePath).getLen();
+ }
+
+ /**
+ * Gets the file system that stores the file state.
+ *
+ * @return The file system that stores the file state.
+ * @throws IOException Thrown if the file system cannot be accessed.
+ */
+ private FileSystem getFileSystem() throws IOException {
+ if (fs == null) {
+ fs = FileSystem.get(filePath.toUri());
+ }
+ return fs;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FileStateHandle)) {
+ return false;
+ }
+
+ FileStateHandle that = (FileStateHandle) o;
+ return filePath.equals(that.filePath);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return filePath.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
deleted file mode 100644
index 5bfb4ee..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-
-/**
- * A state handle that points to state in a file system, accessible as an input stream.
- */
-public class FileStreamStateHandle extends AbstractFileStateHandle implements StreamStateHandle {
-
- private static final long serialVersionUID = -6826990484549987311L;
-
- /**
- * Creates a new FileStreamStateHandle pointing to state at the given file path.
- *
- * @param filePath The path to the file containing the checkpointed state.
- */
- public FileStreamStateHandle(Path filePath) {
- super(filePath);
- }
-
- @Override
- public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
- ensureNotClosed();
-
- InputStream inStream = getFileSystem().open(getFilePath());
- // make sure the state handle is cancelable
- registerCloseable(inStream);
-
- return inStream;
- }
-
- /**
- * Returns the file size in bytes.
- *
- * @return The file size in bytes.
- * @throws IOException Thrown if the file system cannot be accessed.
- */
- @Override
- public long getStateSize() throws IOException {
- return getFileSize();
- }
-
- @Override
- public <T extends Serializable> StateHandle<T> toSerializableHandle() {
- FileSerializableStateHandle<T> handle = new FileSerializableStateHandle<>(getFilePath());
-
- // forward closed status
- if (isClosed()) {
- try {
- handle.close();
- } catch (IOException e) {
- // should not happen on a fresh handle, but forward anyways
- throw new RuntimeException(e);
- }
- }
-
- return handle;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 61cf741..a3f4682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -32,15 +32,12 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
@@ -294,24 +291,6 @@ public class FsStateBackend extends AbstractStateBackend {
}
@Override
- public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception
- {
- checkFileSystemInitialized();
-
- Path checkpointDir = createCheckpointDirPath(checkpointID);
- int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
-
- FsCheckpointStateOutputStream stream =
- new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-
- try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
- os.writeObject(state);
- return stream.closeAndGetHandle().toSerializableHandle();
- }
- }
-
- @Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
checkFileSystemInitialized();
@@ -520,6 +499,11 @@ public class FsStateBackend extends AbstractStateBackend {
}
}
+ @Override
+ public void sync() throws IOException {
+ outStream.sync();
+ }
+
/**
* If the stream is only closed, we remove the produced file (cleanup through the auto close
* feature, for example). This method throws no exception if the deletion fails, but only
@@ -559,7 +543,7 @@ public class FsStateBackend extends AbstractStateBackend {
flush();
outStream.close();
closed = true;
- return new FileStreamStateHandle(statePath);
+ return new FileStateHandle(statePath);
}
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index ba6de42..a42bec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -18,28 +18,32 @@
package org.apache.flink.runtime.state.memory;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.util.Preconditions;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.Serializable;
+import java.util.Arrays;
/**
* A state handle that contains stream state in a byte array.
*/
-public final class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+public class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
private static final long serialVersionUID = -5280226231200217594L;
-
- /** the state data */
- private final byte[] data;
+
+ /**
+ * the state data
+ */
+ protected final byte[] data;
/**
* Creates a new ByteStreamStateHandle containing the given data.
- *
+ *
* @param data The state data.
*/
public ByteStreamStateHandle(byte[] data) {
@@ -47,17 +51,39 @@ public final class ByteStreamStateHandle extends AbstractCloseableHandle impleme
}
@Override
- public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+ public FSDataInputStream openInputStream() throws Exception {
ensureNotClosed();
- ByteArrayInputStream stream = new ByteArrayInputStream(data);
- registerCloseable(stream);
+ FSDataInputStream inputStream = new FSDataInputStream() {
+ int index = 0;
+
+ @Override
+ public void seek(long desired) throws IOException {
+ Preconditions.checkArgument(desired >= 0 && desired < Integer.MAX_VALUE);
+ index = (int) desired;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return index;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return index < data.length ? data[index++] & 0xFF : -1;
+ }
+ };
+ registerCloseable(inputStream);
+ return inputStream;
+ }
- return stream;
+ public byte[] getData() {
+ return data;
}
@Override
- public void discardState() {}
+ public void discardState() {
+ }
@Override
public long getStateSize() {
@@ -65,19 +91,27 @@ public final class ByteStreamStateHandle extends AbstractCloseableHandle impleme
}
@Override
- public <T extends Serializable> StateHandle<T> toSerializableHandle() {
- SerializedStateHandle<T> serializableHandle = new SerializedStateHandle<T>(data);
-
- // forward the closed status
- if (isClosed()) {
- try {
- serializableHandle.close();
- } catch (IOException e) {
- // should not happen on a fresh handle, but forward anyways
- throw new RuntimeException(e);
- }
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
+ if (!(o instanceof ByteStreamStateHandle)) {
+ return false;
+ }
+
+ ByteStreamStateHandle that = (ByteStreamStateHandle) o;
+ return Arrays.equals(data, that.data);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + Arrays.hashCode(data);
+ return result;
+ }
- return serializableHandle;
+ public static StreamStateHandle fromSerializable(Serializable value) throws IOException {
+ return new ByteStreamStateHandle(InstantiationUtil.serializeObject(value));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..af84394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -27,13 +27,12 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.Serializable;
/**
* A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
@@ -104,27 +103,6 @@ public class MemoryStateBackend extends AbstractStateBackend {
return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc);
}
- /**
- * Serialized the given state into bytes using Java serialization and creates a state handle that
- * can re-create that state.
- *
- * @param state The state to checkpoint.
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @param <S> The type of the state.
- *
- * @return A state handle that contains the given state serialized as bytes.
- * @throws Exception Thrown, if the serialization fails.
- */
- @Override
- public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception
- {
- SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
- checkSize(handle.getSizeOfSerializedState(), maxStateSize);
- return new SerializedStateHandle<S>(state);
- }
-
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception
@@ -177,6 +155,14 @@ public class MemoryStateBackend extends AbstractStateBackend {
os.write(b, off, len);
}
+ @Override
+ public void flush() throws IOException {
+ os.flush();
+ }
+
+ @Override
+ public void sync() throws IOException { }
+
// --------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 4420470..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-public class SerializedStateHandle<T extends Serializable> extends AbstractCloseableHandle implements StateHandle<T> {
-
- private static final long serialVersionUID = 4145685722538475769L;
-
- /** The serialized data */
- private final byte[] serializedData;
-
- /**
- * Creates a new serialized state handle, eagerly serializing the given state object.
- *
- * @param value The state object.
- * @throws IOException Thrown, if the serialization fails.
- */
- public SerializedStateHandle(T value) throws IOException {
- this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
- }
-
- /**
- * Creates a new serialized state handle, based in the given already serialized data.
- *
- * @param serializedData The serialized data.
- */
- public SerializedStateHandle(byte[] serializedData) {
- this.serializedData = serializedData;
- }
-
- @Override
- public T getState(ClassLoader classLoader) throws Exception {
- if (classLoader == null) {
- throw new NullPointerException();
- }
-
- ensureNotClosed();
- return serializedData == null ? null : InstantiationUtil.<T>deserializeObject(serializedData, classLoader);
- }
-
- /**
- * Gets the size of the serialized state.
- * @return The size of the serialized state.
- */
- public int getSizeOfSerializedState() {
- return serializedData.length;
- }
-
- /**
- * Discarding heap-memory backed state is a no-op, so this method does nothing.
- */
- @Override
- public void discardState() {}
-
- @Override
- public long getStateSize() {
- return serializedData.length;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6958784..d54826a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
@@ -36,10 +35,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -240,39 +242,21 @@ public class RuntimeEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(long checkpointId) {
- acknowledgeCheckpoint(checkpointId, null);
+ acknowledgeCheckpoint(checkpointId, null, null);
}
@Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
- // try and create a serialized version of the state handle
- SerializedValue<StateHandle<?>> serializedState;
- long stateSize;
-
- if (state == null) {
- serializedState = null;
- stateSize = 0;
- } else {
- try {
- serializedState = new SerializedValue<StateHandle<?>>(state);
- } catch (Exception e) {
- throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e);
- }
-
- try {
- stateSize = state.getStateSize();
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to fetch state handle size", e);
- }
- }
-
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles) {
+
AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
jobId,
executionId,
checkpointId,
- serializedState,
- stateSize);
+ chainedStateHandle,
+ keyGroupStateHandles);
jobManager.tell(message);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c98d512..73601c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -58,10 +58,11 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateUtils;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,9 +224,17 @@ public class Task implements Runnable {
/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */
private volatile ExecutorService asyncCallDispatcher;
- /** The handle to the state that the operator was initialized with. Will be set to null after the
- * initialization, to be memory friendly */
- private volatile SerializedValue<StateHandle<?>> operatorState;
+ /**
+ * The handle to the chained operator state that the task was initialized with. Will be set
+ * to null after the initialization, to be memory friendly.
+ */
+ private volatile ChainedStateHandle<StreamStateHandle> chainedOperatorState;
+
+ /**
+ * The handle to the key group state that the task was initialized with. Will be set
+ * to null after the initialization, to be memory friendly.
+ */
+ private volatile List<KeyGroupsStateHandle> keyGroupStates;
/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;
@@ -257,8 +266,9 @@ public class Task implements Runnable {
this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
- this.operatorState = tdd.getOperatorState();
+ this.chainedOperatorState = tdd.getOperatorState();
this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
+ this.keyGroupStates = tdd.getKeyGroupState();
this.taskCancellationInterval = jobConfiguration.getLong(
ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
@@ -538,21 +548,11 @@ public class Task implements Runnable {
// the state into the task. the state is non-empty if this is an execution
// of a task that failed but had backuped state from a checkpoint
- // get our private reference onto the stack (be safe against concurrent changes)
- SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-
- if (operatorState != null) {
+ if (chainedOperatorState != null || keyGroupStates != null) {
if (invokable instanceof StatefulTask) {
- try {
- StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
- StatefulTask<?> op = (StatefulTask<?>) invokable;
- StateUtils.setOperatorState(op, state);
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
- }
- }
- else {
+ StatefulTask op = (StatefulTask) invokable;
+ op.setInitialState(chainedOperatorState, keyGroupStates);
+ } else {
throw new IllegalStateException("Found operator state for a non-stateful task invokable");
}
}
@@ -560,8 +560,8 @@ public class Task implements Runnable {
// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
// we clear the reference to the state handle
//noinspection UnusedAssignment
- operatorState = null;
- this.operatorState = null;
+ this.chainedOperatorState = null;
+ this.keyGroupStates = null;
// ----------------------------------------------------------------
// actual task core work
@@ -936,7 +936,7 @@ public class Task implements Runnable {
if (invokable instanceof StatefulTask) {
// build a local closure
- final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
+ final StatefulTask statefulTask = (StatefulTask) invokable;
final String taskName = taskNameWithSubtask;
Runnable runnable = new Runnable() {
@@ -977,7 +977,7 @@ public class Task implements Runnable {
if (invokable instanceof StatefulTask) {
// build a local closure
- final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
+ final StatefulTask statefulTask = (StatefulTask) invokable;
final String taskName = taskNameWithSubtask;
Runnable runnable = new Runnable() {
@@ -1192,7 +1192,6 @@ public class Task implements Runnable {
// reason, we spawn a separate thread that repeatedly interrupts the user code until
// it exits
while (executer.isAlive()) {
-
// build the stack trace of where the thread is stuck, for the log
StringBuilder bld = new StringBuilder();
StackTraceElement[] stack = executer.getStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index b585fe6..91db564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.ConfigurationUtil;
import org.slf4j.Logger;
@@ -228,7 +228,7 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration");
- StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
+ RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
@@ -266,7 +266,7 @@ public class ZooKeeperUtils {
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
- StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
+ RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
configuration,
"completedCheckpoint");
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
new file mode 100644
index 0000000..1434f74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.Serializable;
+
+/**
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
+ * the state handle is written to ZooKeeper.
+ *
+ * @param <T> The type of the data that can be stored by this storage helper.
+ */
+public interface RetrievableStateStorageHelper<T extends Serializable> {
+
+ /**
+ * Stores the given state and returns a state handle to it.
+ *
+ * @param state State to be stored
+ * @return State handle to the stored state
+ * @throws Exception
+ */
+ RetrievableStateHandle<T> store(T state) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
deleted file mode 100644
index 36fb849..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.Serializable;
-
-/**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
- * the state handle is written to ZooKeeper.
- *
- * @param <T>
- */
-public interface StateStorageHelper<T extends Serializable> {
-
- /**
- * Stores the given state and returns a state handle to it.
- *
- * @param state State to be stored
- * @return State handle to the stored state
- * @throws Exception
- */
- StateHandle<T> store(T state) throws Exception;
-}