You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:37 UTC
[04/12] flink git commit: [streaming] Initial rework of the operator
state interfaces
[streaming] Initial rework of the operator state interfaces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7e24580
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7e24580
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7e24580
Branch: refs/heads/master
Commit: a7e24580a2178b27ea77e7327c39ad7e75cac0a3
Parents: d42c732
Author: Gyula Fora <gy...@apache.org>
Authored: Sun May 17 20:33:38 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Jun 25 16:38:06 2015 +0200
----------------------------------------------------------------------
.../api/common/functions/RichMapFunction.java | 2 +-
.../api/common/functions/RuntimeContext.java | 47 +++++
.../util/AbstractRuntimeUDFContext.java | 12 ++
.../flink/api/common/state/OperatorState.java | 67 ++++++
.../api/common/state/StateCheckpointer.java | 73 +++++++
.../flink/runtime/state/LocalStateHandle.java | 19 +-
.../runtime/state/PartitionedStateHandle.java | 47 +++++
.../runtime/state/PartitionedStateStore.java | 47 +++++
.../apache/flink/runtime/state/StateUtils.java | 40 ++--
.../api/checkpoint/CheckpointCommitter.java | 7 +-
.../api/datastream/IterativeDataStream.java | 2 +
.../api/datastream/StreamProjection.java | 3 +-
.../api/functions/source/FileReadFunction.java | 8 +-
.../api/functions/source/SourceFunction.java | 4 +-
.../flink/streaming/api/graph/StreamConfig.java | 21 +-
.../api/operators/AbstractStreamOperator.java | 6 +-
.../operators/AbstractUdfStreamOperator.java | 68 +++---
.../api/operators/StatefulStreamOperator.java | 4 +-
.../streaming/api/operators/StreamOperator.java | 8 +-
.../streaming/api/state/BasicCheckpointer.java | 37 ++++
.../streaming/api/state/EagerStateStore.java | 86 ++++++++
.../streaming/api/state/LazyStateStore.java | 117 ++++++++++
.../state/PartitionedStreamOperatorState.java | 126 +++++++++++
.../api/state/StreamOperatorState.java | 100 +++++++++
.../runtime/io/BlockingQueueBroker.java | 4 +-
.../runtime/io/StreamRecordWriter.java | 6 +-
.../runtime/tasks/OneInputStreamTask.java | 4 +-
.../runtime/tasks/StreamIterationHead.java | 1 -
.../streaming/runtime/tasks/StreamTask.java | 22 +-
.../runtime/tasks/StreamingRuntimeContext.java | 30 ++-
.../runtime/tasks/TwoInputStreamTask.java | 1 -
.../serialization/DeserializationSchema.java | 4 +-
.../api/state/StatefulFunctionTest.java | 211 +++++++++++++++++++
.../runtime/tasks/SourceStreamTaskTest.java | 68 +++---
.../flink/streaming/util/MockCoContext.java | 7 +-
.../flink/streaming/util/MockContext.java | 7 +-
.../StreamCheckpointingITCase.java | 118 ++++-------
.../ProcessFailureStreamingRecoveryITCase.java | 79 ++-----
38 files changed, 1244 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
index 2005be0..7adb25b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RichFunction;
public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
private static final long serialVersionUID = 1L;
-
+
@Override
public abstract OUT map(IN value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index f68d2b0..a3b8f65 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -160,4 +162,49 @@ public interface RuntimeContext {
* @return The distributed cache of the worker executing this instance.
*/
DistributedCache getDistributedCache();
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the {@link OperatorState} of this operator instance, which can be
+ * used to store and update user state in a fault tolerant fashion. The
+ * state will be initialized by the provided default value, and the
+ * {@link StateCheckpointer} will be used to draw the state snapshots.
+ *
+ * <p>
+ * When storing a {@link Serializable} state the user can omit the
+ * {@link StateCheckpointer} in which case the full state will be written as
+ * the snapshot.
+ * </p>
+ *
+ * @param defaultState
+ * Default value for the operator state. This will be returned
+ * the first time {@link OperatorState#getState()} (for every
+ * state partition) is called before
+ * {@link OperatorState#updateState(Object)}.
+ * @param checkpointer
+ * The {@link StateCheckpointer} that will be used to draw
+ * snapshots from the user state.
+ * @return The {@link OperatorState} for this instance.
+ */
+ <S,C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S,C> checkpointer);
+
+ /**
+ * Returns the {@link OperatorState} of this operator instance, which can be
+ * used to store and update user state in a fault tolerant fashion. The
+ * state will be initialized by the provided default value.
+ *
+ * <p>
+ * When storing a non-{@link Serializable} state the user needs to specify a
+ * {@link StateCheckpointer} for drawing snapshots.
+ * </p>
+ *
+ * @param defaultState
+ * Default value for the operator state. This will be returned
+ * the first time {@link OperatorState#getState()} (for every
+ * state partition) is called before
+ * {@link OperatorState#updateState(Object)}.
+ * @return The {@link OperatorState} for this instance.
+ */
+ <S extends Serializable> OperatorState<S> getOperatorState(S defaultState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 735fe8e..413565b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.core.fs.Path;
/**
@@ -170,4 +172,14 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
}
return (Accumulator<V, A>) accumulator;
}
+
+ @Override
+ public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S, C> checkpointer) {
+ throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+ }
+
+ @Override
+ public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+ throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
new file mode 100644
index 0000000..5b3fa05
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * Base class for all streaming operator states. It can represent both
+ * partitioned (when state partitioning is defined in the program) or
+ * non-partitioned user states.
+ *
+ * State can be accessed and manipulated using the {@link #getState()} and
+ * {@link #updateState(T)} methods. These calls are only valid in the
+ * transformation call the operator represents, for instance inside
+ * {@link MapFunction#map()} and invalid in
+ * {@link #open(org.apache.flink.configuration.Configuration)} or
+ * {@link #close()}.
+ *
+ * @param <T>
+ * Type of the operator state
+ */
+public interface OperatorState<T> {
+
+ /**
+ * Gets the current state for the operator. When the state is not
+ * partitioned the returned state is the same for all inputs. If state
+ * partitioning is applied the state returned depends on the current
+ * operator input, as the operator maintains an independent state for each
+ * partitions.
+ *
+ * <p>
+ * {@link #getState()} returns <code>null</code> if there is no state stored
+ * in the operator. This is the expected behaviour before initializing the
+ * state with {@link #updateState(T)}.
+ * </p>
+ *
+ * @return The operator state corresponding to the current input.
+ */
+ T getState();
+
+ /**
+ * Updates the operator state accessible by {@link #getState()} to the given
+ * value. The next time {@link #getState()} is called (for the same state
+ * partition) the returned state will represent the updated value.
+ *
+ * @param state
+ * The updated state.
+ */
+ void updateState(T state);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
new file mode 100644
index 0000000..488e308
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface for creating {@link OperatorState} snapshots in stateful
+ * streaming programs.
+ *
+ * The user needs to implement the {@link #snapshotState(S, long, long)} and
+ * {@link #restoreState(C)} methods that will be called to create and restore
+ * state snapshots of the given states.
+ *
+ * <p>
+ * Note that the {@link OperatorState} is <i>synchronously</i> checkpointed.
+ * While the state is written, the state cannot be accessed or modified so the
+ * function needs not return a copy of its state, but may return a reference to
+ * its state.
+ * </p>
+ *
+ * @param <S>
+ * Type of the operator state.
+ * @param <C>
+ * Type of the snapshot that will be persisted.
+ */
+public interface StateCheckpointer<S, C extends Serializable> {
+
+ /**
+ * Takes a snapshot of a given operator state. The snapshot returned will be
+ * persisted in the state backend for this job and restored upon failure.
+ * This method is called for all state partitions in case of partitioned
+ * state when creating a checkpoint.
+ *
+ * @param state
+ * The state for which the snapshot needs to be taken
+ * @param checkpointId
+ * The ID of the checkpoint.
+ * @param checkpointTimestamp
+ * The timestamp of the checkpoint, as derived by
+ * System.currentTimeMillis() on the JobManager.
+ *
+ * @return A snapshot of the operator state.
+ */
+ public C snapshotState(S state, long checkpointId, long checkpointTimestamp);
+
+ /**
+ * Restores the operator states from a given snapshot. The restores state
+ * will be loaded back to the function. In case of partitioned state, each
+ * partition is restored independently.
+ *
+ * @param stateSnapshot
+ * The state snapshot that needs to be restored.
+ * @return The state corresponding to the snapshot.
+ */
+ public S restoreState(C stateSnapshot);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index a53d8da..5ba372d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -23,36 +23,33 @@ import java.io.Serializable;
/**
* A StateHandle that includes the operator states directly.
*/
-public class LocalStateHandle implements StateHandle<Serializable> {
+public class LocalStateHandle<T extends Serializable> implements StateHandle<T> {
private static final long serialVersionUID = 2093619217898039610L;
- private final Serializable state;
+ private final T state;
- public LocalStateHandle(Serializable state) {
+ public LocalStateHandle(T state) {
this.state = state;
}
@Override
- public Serializable getState() {
+ public T getState() {
return state;
}
@Override
public void discardState() throws Exception {
}
-
- public static LocalStateHandleProvider createProvider(){
- return new LocalStateHandleProvider();
- }
- private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> {
+ public static class LocalStateHandleProvider<R extends Serializable> implements
+ StateHandleProvider<R> {
private static final long serialVersionUID = 4665419208932921425L;
@Override
- public LocalStateHandle createStateHandle(Serializable state) {
- return new LocalStateHandle(state);
+ public LocalStateHandle<R> createStateHandle(R state) {
+ return new LocalStateHandle<R>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
new file mode 100644
index 0000000..4119df1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class PartitionedStateHandle implements
+ StateHandle<Map<Serializable, StateHandle<Serializable>>> {
+
+ private static final long serialVersionUID = 7505365403501402100L;
+
+ Map<Serializable, StateHandle<Serializable>> handles;
+
+ public PartitionedStateHandle(Map<Serializable, StateHandle<Serializable>> handles) {
+ this.handles = handles;
+ }
+
+ @Override
+ public Map<Serializable, StateHandle<Serializable>> getState() throws Exception {
+ return handles;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ for (StateHandle<Serializable> handle : handles.values()) {
+ handle.discardState();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
new file mode 100644
index 0000000..8b73edf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Interface for storing and accessing partitioned state. The interface is
+ * designed in a way that allows implementations for lazily state access.
+ *
+ * @param <S>
+ * Type of the state.
+ * @param <C>
+ * Type of the state snapshot.
+ */
+public interface PartitionedStateStore<S, C extends Serializable> {
+
+ S getStateForKey(Serializable key) throws Exception;
+
+ void setStateForKey(Serializable key, S state);
+
+ Map<Serializable, S> getPartitionedState() throws Exception;
+
+ Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId, long checkpointTimestamp) throws Exception;
+
+ void restoreStates(Map<Serializable, StateHandle<C>> snapshots) throws Exception;
+
+ boolean containsKey(Serializable key);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index fbd76ba..7977e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state;
+import java.util.List;
+
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
/**
@@ -26,20 +28,23 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
public class StateUtils {
/**
- * Utility method to define a common generic bound to be used for setting a generic state
- * handle on a generic state carrier.
+ * Utility method to define a common generic bound to be used for setting a
+ * generic state handle on a generic state carrier.
*
- * This has no impact on runtime, since internally, it performs
- * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting
- * to raw types, by giving the compiler a common type bound.
+ * This has no impact on runtime, since internally, it performs unchecked
+ * casts. The purpose is merely to allow the use of generic interfaces
+ * without resorting to raw types, by giving the compiler a common type
+ * bound.
*
- * @param op The state carrier operator.
- * @param state The state handle.
- * @param <T> Type bound for the
+ * @param op
+ * The state carrier operator.
+ * @param state
+ * The state handle.
+ * @param <T>
+ * Type bound for the
*/
- public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state)
- throws Exception
- {
+ public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op,
+ StateHandle<?> state) throws Exception {
@SuppressWarnings("unchecked")
OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
@SuppressWarnings("unchecked")
@@ -47,10 +52,15 @@ public class StateUtils {
typedOp.setInitialState(typedHandle);
}
-
-
+
+ public static List<PartitionedStateHandle> rePartitionHandles(
+ List<PartitionedStateHandle> handles, int numPartitions) {
+ return null;
+ }
+
// ------------------------------------------------------------------------
-
+
/** Do not instantiate */
- private StateUtils() {}
+ private StateUtils() {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
index a95b540..82ef6f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.api.checkpoint;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.StateHandle;
+
/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
@@ -32,6 +36,7 @@ public interface CheckpointCommitter {
* fail any more.
*
* @param checkpointId The ID of the checkpoint that has been completed.
+ * @param checkPointedState Handle to the state that was checkpointed with this checkpoint id.
*/
- void commitCheckpoint(long checkpointId);
+ void commitCheckpoint(long checkpointId, StateHandle<Serializable> checkPointedState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 6a48b6a..2d70d49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
/**
* The iterative data stream represents the start of an iteration in a
* {@link DataStream}.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 16e9deb..149d7a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.datastream;
-import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
@@ -48,6 +47,8 @@ import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.operators.StreamProject;
+import com.google.common.base.Preconditions;
+
public class StreamProjection<IN> {
private DataStream<IN> dataStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index 945f953..4f859e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -17,6 +17,10 @@
package org.apache.flink.streaming.api.functions.source;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -24,10 +28,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 17ca34d..921a33b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -18,10 +18,10 @@
package org.apache.flink.streaming.api.functions.source;
-import org.apache.flink.api.common.functions.Function;
-
import java.io.Serializable;
+import org.apache.flink.api.common.functions.Function;
+
/**
* Base interface for all stream data sources in Flink. The contract of a stream source
* is the following: When the source should start emitting elements the {@link #run} method
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 329b4dd..0784582 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
@@ -59,6 +60,7 @@ public class StreamConfig implements Serializable {
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
+ private static final String STATE_PARTITIONER = "statePartitioner";
// DEFAULT VALUES
private static final long DEFAULT_TIMEOUT = 100;
@@ -381,7 +383,6 @@ public class StreamConfig implements Serializable {
}
public void setStateHandleProvider(StateHandleProvider<?> provider) {
-
try {
InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
} catch (IOException e) {
@@ -398,6 +399,24 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate statehandle provider.", e);
}
}
+
+ public void setStatePartitioner(KeySelector<?, Serializable> partitioner) {
+ try {
+ InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
+ } catch (IOException e) {
+ throw new StreamTaskException("Could not serialize state partitioner.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
+ try {
+ return (KeySelector<?, Serializable>) InstantiationUtil
+ .readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not instantiate state partitioner.", e);
+ }
+ }
public void setChainStart() {
config.setBoolean(IS_CHAINED_VERTEX, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a365587..40f61d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
/**
* Base class for operators that do not contain a user-defined function.
@@ -31,7 +31,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
private static final long serialVersionUID = 1L;
- protected transient RuntimeContext runtimeContext;
+ protected transient StreamingRuntimeContext runtimeContext;
protected transient ExecutionConfig executionConfig;
@@ -43,7 +43,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
@Override
- public void setup(Output<OUT> output, RuntimeContext runtimeContext) {
+ public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
this.output = output;
this.executionConfig = runtimeContext.getExecutionConfig();
this.runtimeContext = runtimeContext;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 90b2b2f..cbcbcee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -18,20 +18,25 @@
package org.apache.flink.streaming.api.operators;
+import java.io.Serializable;
+import java.util.Map;
+
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-
-import java.io.Serializable;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
/**
- * This is used as the base class for operators that have a user-defined function.
+ * This is used as the base class for operators that have a user-defined
+ * function.
*
- * @param <OUT> The output type of the operator
- * @param <F> The type of the user function
+ * @param <OUT>
+ * The output type of the operator
+ * @param <F>
+ * The type of the user function
*/
public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
@@ -44,7 +49,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
@Override
- public final void setup(Output<OUT> output, RuntimeContext runtimeContext) {
+ public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
super.setup(output, runtimeContext);
FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext);
}
@@ -57,35 +62,37 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
@Override
- public void close() throws Exception{
+ public void close() throws Exception {
super.close();
FunctionUtils.closeFunction(userFunction);
}
+ @SuppressWarnings("unchecked")
public void restoreInitialState(Serializable state) throws Exception {
- if (userFunction instanceof Checkpointed) {
- setStateOnFunction(state, userFunction);
- }
- else {
- throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
- }
+
+ Map<Serializable, StateHandle<Serializable>> snapshots = (Map<Serializable, StateHandle<Serializable>>) state;
+
+ StreamOperatorState<?, Serializable> operatorState = (StreamOperatorState<?, Serializable>) runtimeContext
+ .getOperatorState();
+
+ operatorState.restoreState(snapshots);
+
}
- public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception {
- if (userFunction instanceof Checkpointed) {
- return ((Checkpointed<?>) userFunction).snapshotState(checkpointId, timestamp);
- }
- else {
- return null;
- }
+ public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp)
+ throws Exception {
+
+ StreamOperatorState<?,?> operatorState = (StreamOperatorState<?,?>) runtimeContext.getOperatorState();
+
+ return (Serializable) operatorState.snapshotState(checkpointId, timestamp);
}
- public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception {
+ public void confirmCheckpointCompleted(long checkpointId, long timestamp,
+ StateHandle<Serializable> checkpointedState) throws Exception {
if (userFunction instanceof CheckpointCommitter) {
try {
- ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId);
- }
- catch (Exception e) {
+ ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId, checkpointedState);
+ } catch (Exception e) {
throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
}
}
@@ -94,13 +101,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
public F getUserFunction() {
return userFunction;
}
-
- private static <T extends Serializable> void setStateOnFunction(Serializable state, Function function) {
- @SuppressWarnings("unchecked")
- T typedState = (T) state;
- @SuppressWarnings("unchecked")
- Checkpointed<T> typedFunction = (Checkpointed<T>) function;
-
- typedFunction.restoreState(typedState);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index e171af8..343f87d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators;
import java.io.Serializable;
+import org.apache.flink.runtime.state.StateHandle;
+
/**
* Interface for Stream operators that can have state. This interface is used for checkpointing
* and restoring that state.
@@ -31,5 +33,5 @@ public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
- void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception;
+ void confirmCheckpointCompleted(long checkpointId, long timestamp, StateHandle<Serializable> checkpointedState) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 05b15be..aebff5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -17,11 +17,11 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-
import java.io.Serializable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
/**
* Basic interface for stream operators. Implementers would implement one of
* {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
@@ -37,7 +37,7 @@ public interface StreamOperator<OUT> extends Serializable {
/**
* Initializes the {@link StreamOperator} for input and output handling.
*/
- public void setup(Output<OUT> output, RuntimeContext runtimeContext);
+ public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext);
/**
* This method is called before any elements are processed.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
new file mode 100644
index 0000000..14d1504
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+
+public class BasicCheckpointer implements StateCheckpointer<Serializable, Serializable> {
+
+ @Override
+ public Serializable snapshotState(Serializable state, long checkpointId, long checkpointTimestamp) {
+ return state;
+ }
+
+ @Override
+ public Serializable restoreState(Serializable stateSnapshot) {
+ return stateSnapshot;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
new file mode 100644
index 0000000..4ac01a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+
+public class EagerStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
+
+ protected StateCheckpointer<S, C> checkpointer;
+ protected StateHandleProvider<C> provider;
+
+ Map<Serializable, S> fetchedState;
+
+ public EagerStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+ this.checkpointer = checkpointer;
+ this.provider = provider;
+
+ fetchedState = new HashMap<Serializable, S>();
+ }
+
+ @Override
+ public S getStateForKey(Serializable key) throws Exception {
+ return fetchedState.get(key);
+ }
+
+ @Override
+ public void setStateForKey(Serializable key, S state) {
+ fetchedState.put(key, state);
+ }
+
+ @Override
+ public Map<Serializable, S> getPartitionedState() throws Exception {
+ return fetchedState;
+ }
+
+ @Override
+ public Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId,
+ long checkpointTimestamp) {
+
+ Map<Serializable, StateHandle<C>> handles = new HashMap<Serializable, StateHandle<C>>();
+
+ for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
+ handles.put(stateEntry.getKey(), provider.createStateHandle(checkpointer.snapshotState(
+ stateEntry.getValue(), checkpointId, checkpointTimestamp)));
+ }
+ return handles;
+ }
+
+ @Override
+ public void restoreStates(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+ for (Entry<Serializable, StateHandle<C>> snapshotEntry : snapshots.entrySet()) {
+ fetchedState.put(snapshotEntry.getKey(),
+ checkpointer.restoreState(snapshotEntry.getValue().getState()));
+ }
+ }
+
+ @Override
+ public boolean containsKey(Serializable key) {
+ return fetchedState.containsKey(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
new file mode 100644
index 0000000..9872a0c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+
+/**
+ * Implementation of the {@link PartitionedStateStore} interface for lazy
+ * retrieval and snapshotting of the partitioned operator states. Lazy state
+ * access considerably speeds up recovery and makes resource access smoother by
+ * avoiding request congestion in the persistent storage layer.
+ *
+ * <p>
+ * The logic implemented here can also be used later to push unused state to the
+ * persistent layer and also avoids re-snapshotting the unmodified states.
+ * </p>
+ *
+ * @param <S>
+ * Type of the operator states.
+ * @param <C>
+ * Type of the state checkpoints.
+ */
+public class LazyStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
+
+ protected StateCheckpointer<S, C> checkpointer;
+ protected StateHandleProvider<C> provider;
+
+ Map<Serializable, StateHandle<C>> unfetchedState;
+ Map<Serializable, S> fetchedState;
+
+ public LazyStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+ this.checkpointer = checkpointer;
+ this.provider = provider;
+
+ unfetchedState = new HashMap<Serializable, StateHandle<C>>();
+ fetchedState = new HashMap<Serializable, S>();
+ }
+
+ @Override
+ public S getStateForKey(Serializable key) throws Exception {
+ S state = fetchedState.get(key);
+ if (state != null) {
+ return state;
+ } else {
+ StateHandle<C> handle = unfetchedState.get(key);
+ if (handle != null) {
+ state = checkpointer.restoreState(handle.getState());
+ fetchedState.put(key, state);
+ unfetchedState.remove(key);
+ return state;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void setStateForKey(Serializable key, S state) {
+ fetchedState.put(key, state);
+ unfetchedState.remove(key);
+ }
+
+ @Override
+ public Map<Serializable, S> getPartitionedState() throws Exception {
+ for (Entry<Serializable, StateHandle<C>> handleEntry : unfetchedState.entrySet()) {
+ fetchedState.put(handleEntry.getKey(),
+ checkpointer.restoreState(handleEntry.getValue().getState()));
+ }
+ unfetchedState.clear();
+ return fetchedState;
+ }
+
+ @Override
+ public Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId,
+ long checkpointTimestamp) {
+ for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
+ unfetchedState.put(stateEntry.getKey(), provider.createStateHandle(checkpointer
+ .snapshotState(stateEntry.getValue(), checkpointId, checkpointTimestamp)));
+ }
+ return unfetchedState;
+ }
+
+ @Override
+ public void restoreStates(Map<Serializable, StateHandle<C>> snapshots) {
+ unfetchedState.putAll(snapshots);
+ }
+
+ @Override
+ public boolean containsKey(Serializable key) {
+ return fetchedState.containsKey(key) || unfetchedState.containsKey(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
new file mode 100644
index 0000000..26b2a88
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/**
+ * Implementation of the {@link OperatorState} interface for partitioned user
+ * states. It provides methods for checkpointing and restoring partitioned
+ * operator states upon failure.
+ *
+ * @param <IN>
+ * Input type of the underlying {@link OneInputStreamOperator}
+ * @param <S>
+ * Type of the underlying {@link OperatorState}.
+ * @param <C>
+ * Type of the state snapshot.
+ */
+public class PartitionedStreamOperatorState<IN, S, C extends Serializable> extends
+ StreamOperatorState<S, C> {
+
+ // KeySelector for getting the state partition key for each input
+ private KeySelector<IN, Serializable> keySelector;
+
+ private PartitionedStateStore<S, C> stateStore;
+
+ private S defaultState;
+
+ // The currently processed input, used to extract the appropriate key
+ private IN currentInput;
+
+ public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
+ StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector) {
+ super(checkpointer, provider);
+ this.keySelector = keySelector;
+ this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
+ }
+
+ @SuppressWarnings("unchecked")
+ public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
+ KeySelector<IN, Serializable> keySelector) {
+ this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector);
+ }
+
+ @Override
+ public S getState() {
+ if (currentInput == null) {
+ return null;
+ } else {
+ try {
+ Serializable key = keySelector.getKey(currentInput);
+ if(stateStore.containsKey(key)){
+ return stateStore.getStateForKey(key);
+ }else{
+ return defaultState;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void updateState(S state) {
+ if (currentInput == null) {
+ throw new RuntimeException("Need a valid input for updating a state.");
+ } else {
+ try {
+ stateStore.setStateForKey(keySelector.getKey(currentInput), state);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void setDefaultState(S defaultState){
+ this.defaultState = defaultState;
+ }
+
+ public void setCurrentInput(IN input) {
+ currentInput = input;
+ }
+
+ @Override
+ public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
+ long checkpointTimestamp) throws Exception{
+ return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
+ }
+
+ @Override
+ public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+ stateStore.restoreStates(snapshots);
+ }
+
+ @Override
+ public Map<Serializable, S> getPartitionedState() throws Exception {
+ return stateStore.getPartitionedState();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
new file mode 100644
index 0000000..90a3726
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+
+/**
+ * Implementation of the {@link OperatorState} interface for non-partitioned
+ * user states. It provides methods for checkpointing and restoring operator
+ * states upon failure using the provided {@link StateCheckpointer} and
+ * {@link StateHandleProvider}.
+ *
+ * @param <S>
+ * Type of the underlying {@link OperatorState}.
+ * @param <C>
+ * Type of the state snapshot.
+ */
+public class StreamOperatorState<S, C extends Serializable> implements OperatorState<S> {
+
+ protected static final Serializable DEFAULTKEY = -1;
+
+ private S state;
+ private StateCheckpointer<S, C> checkpointer;
+ private StateHandleProvider<C> provider;
+
+ public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+ this.checkpointer = checkpointer;
+ this.provider = provider;
+ }
+
+ @SuppressWarnings("unchecked")
+ public StreamOperatorState(StateHandleProvider<C> provider) {
+ this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider);
+ }
+
+ @Override
+ public S getState() {
+ return state;
+ }
+
+ @Override
+ public void updateState(S state) {
+ this.state = state;
+ }
+
+ public void setDefaultState(S defaultState){
+ updateState(defaultState);
+ }
+
+ public StateCheckpointer<S, C> getCheckpointer() {
+ return checkpointer;
+ }
+
+ public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
+ this.checkpointer = checkpointer;
+ }
+
+ protected StateHandleProvider<C> getStateHandleProvider() {
+ return provider;
+ }
+
+ public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
+ long checkpointTimestamp) throws Exception {
+ return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState(
+ getState(), checkpointId, checkpointTimestamp)));
+
+ }
+
+ public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+ updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
+ }
+
+ public Map<Serializable, S> getPartitionedState() throws Exception {
+ return ImmutableMap.of(DEFAULTKEY, state);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 3905558..247fe25 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -17,9 +17,9 @@
package org.apache.flink.streaming.runtime.io;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BlockingQueue;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 941ddd2..c212346 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -17,14 +17,14 @@
package org.apache.flink.streaming.runtime.io;
+import java.io.IOException;
+
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import java.io.IOException;
-
public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
private long timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 2360aa8..6750b52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks;
+import java.io.IOException;
+
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -29,8 +31,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index e69f533..4952cdf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index db95dcc..e7f4d9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.event.task.TaskEvent;
@@ -41,6 +42,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,10 +105,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
return getEnvironment().getTaskName();
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
Environment env = getEnvironment();
- return new StreamingRuntimeContext(conf.getStreamOperator(userClassLoader).getClass()
- .getSimpleName(), env, getUserCodeClassLoader(), getExecutionConfig());
+ String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
+
+ KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
+
+ StreamOperatorState state = statePartitioner == null ? new StreamOperatorState(
+ getStateHandleProvider()) : new PartitionedStreamOperatorState(
+ getStateHandleProvider(), statePartitioner);
+
+ return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(),
+ getExecutionConfig(), state);
}
private StateHandleProvider<Serializable> getStateHandleProvider() {
@@ -129,7 +141,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
switch (backend) {
case JOBMANAGER:
LOG.info("State backend for state checkpoints is set to jobmanager.");
- return LocalStateHandle.createProvider();
+ return new LocalStateHandle.LocalStateHandleProvider<Serializable>();
case FILESYSTEM:
String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
if (checkpointDir != null) {
@@ -294,13 +306,13 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
// we do nothing here so far. this should call commit on the source function, for example
synchronized (checkpointLock) {
if (streamOperator instanceof StatefulStreamOperator) {
- ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+ ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
}
if (hasChainedOperators) {
for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
if (chainedOperator instanceof StatefulStreamOperator) {
- ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+ ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 6112e03..5fd158c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,13 +18,18 @@
package org.apache.flink.streaming.runtime.tasks;
+import java.io.Serializable;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
/**
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -33,13 +38,16 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
public class StreamingRuntimeContext extends RuntimeUDFContext {
private final Environment env;
+ @SuppressWarnings("rawtypes")
+ private StreamOperatorState state;
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig) {
+ ExecutionConfig executionConfig, StreamOperatorState<?, ?> state) {
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
executionConfig, env.getDistributedCacheEntries());
this.env = env;
+ this.state = state;
}
/**
@@ -60,5 +68,25 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
public Configuration getTaskStubParameters() {
return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState,
+ StateCheckpointer<S, C> checkpointer) {
+ state.setCheckpointer(checkpointer);
+ return (OperatorState<S>) state;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+ state.setDefaultState(defaultState);
+ return (OperatorState<S>) state;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Serializable> OperatorState<S> getOperatorState() {
+ return (OperatorState<S>) state;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 67119f7..2052877 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.runtime.io.CoRecordReader;
import org.apache.flink.streaming.runtime.io.InputGateFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 87c9757..333bcdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,10 +17,10 @@
package org.apache.flink.streaming.util.serialization;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
import java.io.Serializable;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
new file mode 100644
index 0000000..f4c1a89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+public class StatefulFunctionTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void simpleStateTest() throws Exception {
+
+ StatefulMapper mapper = new StatefulMapper();
+ MockContext context = new MockContext(false, mapper);
+ mapper.setRuntimeContext(context);
+ mapper.open(null);
+
+ assertEquals(Arrays.asList("1", "2", "3", "4", "5"),
+ applyOnSequence(mapper, 1, 5, context.state));
+ assertEquals((Integer) 5, context.state.getState());
+
+ byte[] serializedState = InstantiationUtil.serializeObject(context.state
+ .snapshotState(1, 1));
+
+ StatefulMapper restoredMapper = new StatefulMapper();
+ MockContext restoredContext = new MockContext(false, restoredMapper);
+ restoredMapper.setRuntimeContext(context);
+ restoredMapper.open(null);
+
+ assertEquals(null, restoredContext.state.getState());
+
+ Map<Serializable, StateHandle<Integer>> deserializedState = (Map<Serializable, StateHandle<Integer>>) InstantiationUtil
+ .deserializeObject(serializedState, Thread.currentThread().getContextClassLoader());
+
+ restoredContext.state.restoreState(deserializedState);
+
+ assertEquals((Integer) 5, restoredContext.state.getState());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void partitionedStateTest() throws Exception {
+ StatefulMapper mapper = new StatefulMapper();
+ MockContext context = new MockContext(true, mapper);
+ mapper.setRuntimeContext(context);
+ mapper.open(null);
+
+ assertEquals(Arrays.asList("1", "2", "3", "4", "5"),
+ applyOnSequence(mapper, 1, 5, context.state));
+ assertEquals(ImmutableMap.of(0, 2, 1, 3), context.state.getPartitionedState());
+
+ byte[] serializedState = InstantiationUtil.serializeObject(context.state
+ .snapshotState(1, 1));
+
+ StatefulMapper restoredMapper = new StatefulMapper();
+ MockContext restoredContext = new MockContext(true, restoredMapper);
+ restoredMapper.setRuntimeContext(context);
+ restoredMapper.open(null);
+
+ assertEquals(null, restoredContext.state.getState());
+
+ Map<Serializable, StateHandle<Integer>> deserializedState = (Map<Serializable, StateHandle<Integer>>) InstantiationUtil
+ .deserializeObject(serializedState, Thread.currentThread().getContextClassLoader());
+
+ restoredContext.state.restoreState(deserializedState);
+
+ assertEquals(ImmutableMap.of(0, 2, 1, 3), restoredContext.state.getPartitionedState());
+
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private <T> List<T> applyOnSequence(MapFunction<Integer, T> mapper, int from, int to,
+ StreamOperatorState state) throws Exception {
+ List<T> output = new ArrayList<T>();
+ for (int i = from; i <= to; i++) {
+ if (state instanceof PartitionedStreamOperatorState) {
+ ((PartitionedStreamOperatorState) state).setCurrentInput(i);
+ }
+ output.add(mapper.map(i));
+ }
+ return output;
+ }
+
+ public static class ModKey implements KeySelector<Integer, Serializable> {
+
+ private static final long serialVersionUID = 4193026742083046736L;
+
+ int base;
+
+ public ModKey(int base) {
+ this.base = base;
+ }
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value % base;
+ }
+
+ }
+
+ public static class StatefulMapper extends RichMapFunction<Integer, String> {
+
+ private static final long serialVersionUID = -9007873655253339356L;
+ OperatorState<Integer> opState;
+
+ @Override
+ public String map(Integer value) throws Exception {
+ opState.updateState(opState.getState() + 1);
+ return value.toString();
+ }
+
+ @Override
+ public void open(Configuration conf) {
+ opState = getRuntimeContext().getOperatorState(0);
+ }
+ }
+
+ public static class MockContext implements RuntimeContext {
+
+ StreamOperatorState<Integer, Integer> state;
+
+ public MockContext(boolean isPartitionedState, StatefulMapper mapper) {
+ if (isPartitionedState) {
+ this.state = new PartitionedStreamOperatorState<Integer, Integer, Integer>(
+ new LocalStateHandleProvider<Integer>(), new ModKey(2));
+ } else {
+ this.state = new StreamOperatorState<Integer, Integer>(
+ new LocalStateHandleProvider<Integer>());
+ }
+ }
+
+ public String getTaskName() {return null;}
+ public int getNumberOfParallelSubtasks() {return 0;}
+ public int getIndexOfThisSubtask() {return 0;}
+ public ExecutionConfig getExecutionConfig() {return null;}
+ public ClassLoader getUserCodeClassLoader() {return null;}
+ public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+ public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {return null;}
+ public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {return null;}
+ public IntCounter getIntCounter(String name) {return null;}
+ public LongCounter getLongCounter(String name) {return null;}
+ public DoubleCounter getDoubleCounter(String name) {return null;}
+ public Histogram getHistogram(String name) {return null;}
+ public <RT> List<RT> getBroadcastVariable(String name) {return null;}
+ public <T, C> C getBroadcastVariableWithInitializer(String name,
+ BroadcastVariableInitializer<T, C> initializer) {return null;}
+ public DistributedCache getDistributedCache() {return null;}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState,
+ StateCheckpointer<S, C> checkpointer) {
+ state.setDefaultState((Integer) defaultState);
+ return (OperatorState<S>) state;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+ state.setDefaultState((Integer) defaultState);
+ return (OperatorState<S>) state;
+ }
+
+ }
+
+}