You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:35 UTC
[08/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index dc9a152..7a1bea4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -19,26 +19,17 @@
package org.apache.flink.streaming.api.operators;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
-import org.apache.flink.streaming.api.state.StreamOperatorState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import static java.util.Objects.requireNonNull;
/**
* This is used as the base class for operators that have a user-defined
@@ -50,22 +41,20 @@ import org.slf4j.LoggerFactory;
* @param <F>
* The type of the user function
*/
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
- extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-
+
/** the user function */
protected final F userFunction;
/** Flag to prevent duplicate function.close() calls in close() and dispose() */
- private boolean functionsClosed = false;
-
+ private transient boolean functionsClosed = false;
+
public AbstractUdfStreamOperator(F userFunction) {
- this.userFunction = Objects.requireNonNull(userFunction);
+ this.userFunction = requireNonNull(userFunction);
}
/**
@@ -79,18 +68,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
// ------------------------------------------------------------------------
// operator life cycle
// ------------------------------------------------------------------------
-
- @Override
- public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
- super.setup(output, runtimeContext);
- FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext);
- }
-
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- FunctionUtils.openFunction(userFunction, parameters);
+ public void open() throws Exception {
+ super.open();
+
+ FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
+ FunctionUtils.openFunction(userFunction, new Configuration());
}
@Override
@@ -118,76 +102,81 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
// ------------------------------------------------------------------------
@Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
- // Restore state using the Checkpointed interface
- if (userFunction instanceof Checkpointed && snapshots.f0 != null) {
- ((Checkpointed) userFunction).restoreState(snapshots.f0.getState(runtimeContext.getUserCodeClassLoader()));
- }
-
- if (snapshots.f1 != null) {
- // We iterate over the states registered for this operator, initialize and restore it
- for (Entry<String, OperatorStateHandle> snapshot : snapshots.f1.entrySet()) {
- StreamOperatorState restoredOpState = runtimeContext.getState(snapshot.getKey(), snapshot.getValue().isPartitioned());
- StateHandle<Serializable> checkpointHandle = snapshot.getValue();
- restoredOpState.restoreState(checkpointHandle, runtimeContext.getUserCodeClassLoader());
+ if (userFunction instanceof Checkpointed) {
+ @SuppressWarnings("unchecked")
+ Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
+
+ Serializable udfState;
+ try {
+ udfState = chkFunction.snapshotState(checkpointId, timestamp);
+ }
+ catch (Exception e) {
+ throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
+ }
+
+ if (udfState != null) {
+ try {
+ StateBackend<?> stateBackend = getStateBackend();
+ StateHandle<Serializable> handle =
+ stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
+ state.setFunctionState(handle);
+ }
+ catch (Exception e) {
+ throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
+ + e.getMessage(), e);
+ }
}
}
+ return state;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
- throws Exception {
- // Get all the states for the operator
- Map<String, StreamOperatorState<?, ?>> operatorStates = runtimeContext.getOperatorStates();
+ @Override
+ public void restoreState(StreamTaskState state) throws Exception {
+ super.restoreState(state);
- Map<String, OperatorStateHandle> operatorStateSnapshots;
- if (operatorStates.isEmpty()) {
- // We return null to signal that there is nothing to checkpoint
- operatorStateSnapshots = null;
- } else {
- // Checkpoint the states and store the handles in a map
- Map<String, OperatorStateHandle> snapshots = new HashMap<String, OperatorStateHandle>();
-
- for (Entry<String, StreamOperatorState<?, ?>> state : operatorStates.entrySet()) {
- boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState;
- snapshots.put(state.getKey(),
- new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp),
- isPartitioned));
- }
-
- operatorStateSnapshots = snapshots;
- }
+ StateHandle<Serializable> stateHandle = state.getFunctionState();
- StateHandle<Serializable> checkpointedSnapshot = null;
- // if the UDF implements the Checkpointed interface we draw a snapshot
- if (userFunction instanceof Checkpointed) {
- StateHandleProvider<Serializable> provider = runtimeContext.getStateHandleProvider();
- Serializable state = ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp);
- if (state != null) {
- checkpointedSnapshot = provider.createStateHandle(state);
+ if (userFunction instanceof Checkpointed && stateHandle != null) {
+ @SuppressWarnings("unchecked")
+ Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
+
+ Serializable functionState = stateHandle.getState(getUserCodeClassloader());
+ if (functionState != null) {
+ try {
+ chkFunction.restoreState(functionState);
+ }
+ catch (Exception e) {
+ throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
+ }
}
}
-
- // if we have either operator or checkpointed state we store it in a
- // tuple2 otherwise return null
- if (operatorStateSnapshots != null || checkpointedSnapshot != null) {
- return Tuple2.of(checkpointedSnapshot, operatorStateSnapshots);
- } else {
- return null;
- }
-
}
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ @Override
+ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+ super.notifyOfCompletedCheckpoint(checkpointId);
+
if (userFunction instanceof CheckpointNotifier) {
- try {
- ((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
- } catch (Exception e) {
- throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
- }
+ ((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
}
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ *
+ * Since the streaming API does not implement any parametrization of functions via a
+ * configuration, the config returned here is actually empty.
+ *
+ * @return The user function parameters (currently empty)
+ */
+ public Configuration getUserFunctionParameters() {
+ return new Configuration();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
new file mode 100644
index 0000000..3a752b0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * Defines the chaining scheme for the operator.
+ * By default {@link #ALWAYS} is used, which means operators will be eagerly chained whenever possible.
+ */
+public enum ChainingStrategy {
+
+ /**
+ * Chaining will happen even if chaining is disabled on the execution environment.
+ * This should only be used by system-level operators, not operators implemented by users.
+ */
+ FORCE_ALWAYS,
+
+ /**
+ * Operators will be eagerly chained whenever possible, for
+ * maximal performance. It is generally a good practice to allow maximal
+ * chaining and increase operator parallelism
+ */
+ ALWAYS,
+
+ /**
+ * The operator will not be chained to the preceding or succeeding operators.
+ */
+ NEVER,
+
+
+ HEAD
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
index 7ca540f..705c1b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
@@ -35,7 +35,7 @@ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
* Processes one element that arrived at this operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
- public void processElement(StreamRecord<IN> element) throws Exception;
+ void processElement(StreamRecord<IN> element) throws Exception;
/**
* Processes a {@link Watermark}.
@@ -43,5 +43,5 @@ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
- public void processWatermark(Watermark mark) throws Exception;
+ void processWatermark(Watermark mark) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index b68432604..0cbc954 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -34,7 +34,7 @@ public interface Output<T> extends Collector<T> {
* operators.
*
* <p>A watermark specifies that no element with a timestamp older or equal to the watermark
- * timestamp will be emitted in the future.</p>
+ * timestamp will be emitted in the future.
*/
void emitWatermark(Watermark mark);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
deleted file mode 100644
index d400fc4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-
-/**
- * Interface for Stream operators that can have state. This interface is used for checkpointing
- * and restoring that state.
- *
- * @param <OUT> The output type of the operator
- */
-public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
-
- void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state) throws Exception;
-
- Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
-
- void notifyCheckpointComplete(long checkpointId) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index ff7f662..23b638e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,8 +35,8 @@ public class StreamFlatMap<IN, OUT>
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
collector = new TimestampedCollector<OUT>(output);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 732630a..79e319a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -22,53 +22,41 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.state.KVMapCheckpointer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+public class StreamGroupedFold<IN, OUT, KEY>
+ extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
// Grouped values
- private KeySelector<IN, ?> keySelector;
- private transient OperatorState<HashMap<Object, OUT>> values;
-
+ private transient OperatorState<OUT> values;
+
+ private transient OUT initialValue;
+
// Initial value serialization
private byte[] serializedInitialValue;
+
private TypeSerializer<OUT> outTypeSerializer;
- private transient OUT initialValue;
-
- // Store the typeinfo, create serializer during runtime
- private TypeInformation<Object> keyTypeInformation;
-
- @SuppressWarnings("unchecked")
- public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
- OUT initialValue, TypeInformation<IN> inTypeInformation) {
+
+ public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
super(folder);
- this.keySelector = keySelector;
this.initialValue = initialValue;
- keyTypeInformation = (TypeInformation<Object>) TypeExtractor
- .getKeySelectorTypes(keySelector, inTypeInformation);
-
}
@Override
- public void open(Configuration configuration) throws Exception {
- super.open(configuration);
+ public void open() throws Exception {
+ super.open();
if (serializedInitialValue == null) {
throw new RuntimeException("No initial value was serialized for the fold " +
@@ -80,25 +68,20 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
new DataInputStream(bais)
);
initialValue = outTypeSerializer.deserialize(in);
-
- values = runtimeContext.getOperatorState("flink_internal_fold_values",
- new HashMap<Object, OUT>(), false,
- new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
- outTypeSerializer));
+ values = createKeyValueState(outTypeSerializer, null);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
- Object key = keySelector.getKey(element.getValue());
- OUT value = values.value().get(key);
+ OUT value = values.value();
if (value != null) {
OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
- values.value().put(key, folded);
+ values.update(folded);
output.collect(element.replace(folded));
} else {
OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
- values.value().put(key, first);
+ values.update(first);
output.collect(element.replace(first));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 579814d..ebc4b09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -19,61 +19,43 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.state.KVMapCheckpointer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.util.HashMap;
-
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
+
+ private transient OperatorState<IN> values;
+
+ private TypeSerializer<IN> serializer;
- private KeySelector<IN, ?> keySelector;
- private transient OperatorState<HashMap<Object, IN>> values;
-
- // Store the typeinfo, create serializer during runtime
- private TypeInformation<Object> keyTypeInformation;
- private TypeInformation<IN> valueTypeInformation;
-
- @SuppressWarnings("unchecked")
- public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector,
- TypeInformation<IN> typeInformation) {
+
+ public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
super(reducer);
- this.keySelector = keySelector;
- valueTypeInformation = typeInformation;
- keyTypeInformation = (TypeInformation<Object>) TypeExtractor
- .getKeySelectorTypes(keySelector, typeInformation);
+ this.serializer = serializer;
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- values = runtimeContext.getOperatorState("flink_internal_reduce_values",
- new HashMap<Object, IN>(), false,
- new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
- valueTypeInformation.createSerializer(executionConfig)));
+ public void open() throws Exception {
+ super.open();
+ values = createKeyValueState(serializer, null);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
- Object key = keySelector.getKey(element.getValue());
-
- IN currentValue = values.value().get(key);
+ IN value = element.getValue();
+ IN currentValue = values.value();
+
if (currentValue != null) {
- // TODO: find a way to let operators copy elements (maybe)
- IN reduced = userFunction.reduce(currentValue, element.getValue());
- values.value().put(key, reduced);
+ IN reduced = userFunction.reduce(currentValue, value);
+ values.update(reduced);
output.collect(element.replace(reduced));
} else {
- values.value().put(key, element.getValue());
- output.collect(element.replace(element.getValue()));
+ values.update(value);
+ output.collect(element.replace(value));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d65dc64..fac26f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,9 +19,10 @@ package org.apache.flink.streaming.api.operators;
import java.io.Serializable;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
/**
* Basic interface for stream operators. Implementers would implement one of
@@ -29,27 +30,25 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
* that process elements.
*
- * <p>
- * The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
+ * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
* offers default implementation for the lifecycle and properties methods.
*
- * <p>
- * Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
+ * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
* the timer service, timer callbacks are also guaranteed not to be called concurrently with
* methods on {@code StreamOperator}.
*
* @param <OUT> The output type of the operator
*/
public interface StreamOperator<OUT> extends Serializable {
-
+
// ------------------------------------------------------------------------
- // Life Cycle
+ // life cycle
// ------------------------------------------------------------------------
/**
* Initializes the operator. Sets access to the context and the output.
*/
- void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext);
+ void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
/**
* This method is called immediately before any elements are processed, it should contain the
@@ -57,7 +56,7 @@ public interface StreamOperator<OUT> extends Serializable {
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/
- void open(Configuration config) throws Exception;
+ void open() throws Exception;
/**
* This method is called after all records have been added to the operators via the methods
@@ -82,43 +81,66 @@ public interface StreamOperator<OUT> extends Serializable {
* that the operator has acquired.
*/
void dispose();
-
// ------------------------------------------------------------------------
- // Context and chaining properties
+ // state snapshots
// ------------------------------------------------------------------------
+
+ /**
+ * Called to draw a state snapshot from the operator. This method snapshots the operator state
+ * (if the operator is stateful) and the key/value state (if it is being used and has been
+ * initialized).
+ *
+ * @param checkpointId The ID of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ *
+ * @return The StreamTaskState object, possibly containing the snapshots for the
+ * operator and key/value state.
+ *
+ * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
+ * and the key/value state.
+ */
+ StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
/**
- * Returns a context that allows the operator to query information about the execution and also
- * to interact with systems such as broadcast variables and managed state. This also allows
- * to register timers.
+ * Restores the operator state, if this operator's execution is recovering from a checkpoint.
+ * This method restores the operator state (if the operator is stateful) and the key/value state
+ * (if it had been used and was initialized when the snapshot ocurred).
+ *
+ * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
+ * and before {@link #open()}.
+ *
+ * @param state The state of operator that was snapshotted as part of checkpoint
+ * from which the execution is restored.
+ *
+ * @throws Exception Exceptions during state restore should be forwarded, so that the system can
+ * properly react to failed state restore and fail the execution attempt.
*/
- StreamingRuntimeContext getRuntimeContext();
+ void restoreState(StreamTaskState state) throws Exception;
/**
+ * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
+ *
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ *
+ * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
+ * the program to fail and enter recovery.
+ */
+ void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
+
+ // ------------------------------------------------------------------------
+ // miscellaneous
+ // ------------------------------------------------------------------------
+
+ void setKeyContextElement(StreamRecord<?> record) throws Exception;
+
+ /**
* An operator can return true here to disable copying of its input elements. This overrides
* the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
*/
boolean isInputCopyingDisabled();
-
- void setChainingStrategy(ChainingStrategy strategy);
-
+
ChainingStrategy getChainingStrategy();
- /**
- * Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is used,
- * which means operators will be eagerly chained whenever possible, for
- * maximal performance. It is generally a good practice to allow maximal
- * chaining and increase operator parallelism. </p> When the strategy is set
- * to <b>NEVER</b>, the operator will not be chained to the preceding or succeeding
- * operators.</p> <b>HEAD</b> strategy marks a start of a new chain, so that the
- * operator will not be chained to preceding operators, only succeding ones.
- *
- * <b>FORCE_ALWAYS</b> will enable chaining even if chaining is disabled on the execution
- * environment. This should only be used by system-level operators, not operators implemented
- * by users.
- */
- public static enum ChainingStrategy {
- FORCE_ALWAYS, ALWAYS, NEVER, HEAD
- }
+ void setChainingStrategy(ChainingStrategy strategy);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index c0815b5..1ce4ff6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -53,8 +52,8 @@ public class StreamProject<IN, OUT extends Tuple>
}
@Override
- public void open(Configuration config) throws Exception {
- super.open(config);
+ public void open() throws Exception {
+ super.open();
outTuple = outSerializer.createInstance();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index ecf799b..fbecbd1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -43,7 +43,8 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
}
public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
-
+ final ExecutionConfig executionConfig = getExecutionConfig();
+
if (userFunction instanceof EventTimeSourceFunction) {
ctx = new ManualWatermarkContext<T>(lockingObject, collector);
} else if (executionConfig.getAutoWatermarkInterval() > 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
new file mode 100644
index 0000000..40998dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
+ * for streaming operators.
+ */
+public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
+
+ /** The operator to which this function belongs */
+ private final AbstractStreamOperator<?> operator;
+
+ /** The task environment running the operator */
+ private final Environment taskEnvironment;
+
+ /** The key/value state, if the user-function requests it */
+ private OperatorState<?> keyValueState;
+
+ /** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
+ private TypeInformation<?> stateTypeInfo;
+
+
+ public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
+ Environment env, Map<String, Accumulator<?, ?>> accumulators) {
+ super(env.getTaskName(),
+ env.getNumberOfSubtasks(),
+ env.getIndexInSubtaskGroup(),
+ env.getUserClassLoader(),
+ operator.getExecutionConfig(),
+ accumulators,
+ env.getDistributedCacheEntries());
+
+ this.operator = operator;
+ this.taskEnvironment = env;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the input split provider associated with the operator.
+ *
+ * @return The input split provider.
+ */
+ public InputSplitProvider getInputSplitProvider() {
+ return taskEnvironment.getInputSplitProvider();
+ }
+
+ /**
+ * Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
+ * This call is guaranteed to not happen concurrently with method calls on the operator.
+ *
+ * @param time The absolute time in milliseconds.
+ * @param target The target to be triggered.
+ */
+ public void registerTimer(long time, Triggerable target) {
+ operator.registerTimer(time, target);
+ }
+
+ // ------------------------------------------------------------------------
+ // broadcast variables
+ // ------------------------------------------------------------------------
+
+ @Override
+ public <RT> List<RT> getBroadcastVariable(String name) {
+ throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
+ }
+
+ @Override
+ public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+ throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
+ }
+
+ // ------------------------------------------------------------------------
+ // key/value state
+ // ------------------------------------------------------------------------
+
+ @Override
+ public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ requireNonNull(stateType, "The state type class must not be null");
+
+ TypeInformation<S> typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForClass(stateType);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Cannot analyze type '" + stateType.getName() +
+ "' from the class alone, due to generic type parameters. " +
+ "Please specify the TypeInformation directly.");
+ }
+
+ return getKeyValueState(typeInfo, defaultState);
+ }
+
+ @Override
+ public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ requireNonNull(stateType, "The state type information must not be null");
+
+ // check if this is a repeated call to access the state
+ if (this.stateTypeInfo != null && this.keyValueState != null) {
+ // repeated call
+ if (this.stateTypeInfo.equals(stateType)) {
+ // valid case, same type requested again
+ @SuppressWarnings("unchecked")
+ OperatorState<S> previous = (OperatorState<S>) this.keyValueState;
+ return previous;
+ }
+ else {
+ // invalid case, different type requested this time
+ throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
+ " ; The key/value state has already been created and initialized for a different type: " +
+ this.stateTypeInfo);
+ }
+ }
+ else {
+ // first time access to the key/value state
+ try {
+ OperatorState<S> state = operator.createKeyValueState(stateType, defaultState);
+ this.keyValueState = state;
+ this.stateTypeInfo = stateType;
+ return state;
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Cannot initialize the key/value state", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index cbf59c1..806cef2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.operators.co;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -44,8 +43,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
collector = new TimestampedCollector<OUT>(output);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
new file mode 100644
index 0000000..b974674
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> {
+
+ /** Map containing the actual key/value pairs */
+ private final HashMap<K, V> state;
+
+ /** The serializer for the keys */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the values */
+ private final TypeSerializer<V> valueSerializer;
+
+ /** The value that is returned when no other value has been associated with a key, yet */
+ private final V defaultValue;
+
+ /** The current key, which the next value methods will refer to */
+ private K currentKey;
+
+ /**
+ * Creates a new empty key/value state.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ */
+ protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer,
+ V defaultValue) {
+ this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>());
+ }
+
+ /**
+ * Creates a new key/value state for the given hash map of key/value pairs.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ * @param state The state map to use in this kev/value state. May contain initial state.
+ */
+ protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer,
+ V defaultValue,
+ HashMap<K, V> state) {
+ this.state = requireNonNull(state);
+ this.keySerializer = requireNonNull(keySerializer);
+ this.valueSerializer = requireNonNull(valueSerializer);
+ this.defaultValue = defaultValue;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public V value() {
+ V value = state.get(currentKey);
+ return value != null ? value : defaultValue;
+ }
+
+ @Override
+ public void update(V value) {
+ if (value != null) {
+ state.put(currentKey, value);
+ }
+ else {
+ state.remove(currentKey);
+ }
+ }
+
+ @Override
+ public void setCurrentKey(K currentKey) {
+ this.currentKey = currentKey;
+ }
+
+ @Override
+ public int size() {
+ return state.size();
+ }
+
+ @Override
+ public void dispose() {
+ state.clear();
+ }
+
+ /**
+ * Gets the serializer for the keys.
+ * @return The serializer for the keys.
+ */
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * Gets the serializer for the values.
+ * @return The serializer for the values.
+ */
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // checkpointing utilities
+ // ------------------------------------------------------------------------
+
+ protected void writeStateToOutputView(final DataOutputView out) throws IOException {
+ for (Map.Entry<K, V> entry : state.entrySet()) {
+ keySerializer.serialize(entry.getKey(), out);
+ valueSerializer.serialize(entry.getValue(), out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
deleted file mode 100644
index 14d1504..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-
-public class BasicCheckpointer implements StateCheckpointer<Serializable, Serializable> {
-
- @Override
- public Serializable snapshotState(Serializable state, long checkpointId, long checkpointTimestamp) {
- return state;
- }
-
- @Override
- public Serializable restoreState(Serializable stateSnapshot) {
- return stateSnapshot;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
deleted file mode 100644
index 2091624..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-
-public class EagerStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
-
- private StateCheckpointer<S, C> checkpointer;
- private final StateHandleProvider<Serializable> provider;
-
- private Map<Serializable, S> fetchedState;
-
- @SuppressWarnings("unchecked")
- public EagerStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
- this.checkpointer = checkpointer;
- this.provider = (StateHandleProvider<Serializable>) provider;
-
- fetchedState = new HashMap<Serializable, S>();
- }
-
- @Override
- public S getStateForKey(Serializable key) throws IOException {
- return fetchedState.get(key);
- }
-
- @Override
- public void setStateForKey(Serializable key, S state) {
- fetchedState.put(key, state);
- }
-
- @Override
- public void removeStateForKey(Serializable key) {
- fetchedState.remove(key);
- }
-
- @Override
- public Map<Serializable, S> getPartitionedState() throws IOException {
- return fetchedState;
- }
-
- @Override
- public StateHandle<Serializable> snapshotStates(long checkpointId, long checkpointTimestamp) {
- // we map the values in the state-map using the state-checkpointer and store it as a checkpoint
- Map<Serializable, C> checkpoints = new HashMap<Serializable, C>();
- for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
- checkpoints.put(stateEntry.getKey(),
- checkpointer.snapshotState(stateEntry.getValue(), checkpointId, checkpointTimestamp));
- }
- return provider.createStateHandle((Serializable) checkpoints);
- }
-
- @Override
- public void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader)
- throws Exception {
-
- @SuppressWarnings("unchecked")
- Map<Serializable, C> checkpoints = (Map<Serializable, C>) snapshot.getState(userCodeClassLoader);
-
- // we map the values back to the state from the checkpoints
- for (Entry<Serializable, C> snapshotEntry : checkpoints.entrySet()) {
- fetchedState.put(snapshotEntry.getKey(), (S) checkpointer.restoreState(snapshotEntry.getValue()));
- }
- }
-
- @Override
- public boolean containsKey(Serializable key) {
- return fetchedState.containsKey(key);
- }
-
- @Override
- public void setCheckPointer(StateCheckpointer<S, C> checkpointer) {
- this.checkpointer = checkpointer;
- }
-
- @Override
- public String toString() {
- return fetchedState.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
deleted file mode 100644
index 17cb6a0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implementation of the {@link StateCheckpointer} interface for a map storing
- * types compatible with Flink's serialization system.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class KVMapCheckpointer<K, V> implements StateCheckpointer<HashMap<K, V>, byte[]> {
-
- private TypeSerializer<K> keySerializer;
- private TypeSerializer<V> valueSerializer;
-
- public KVMapCheckpointer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- }
-
- @Override
- public byte[] snapshotState(HashMap<K, V> stateMap, long checkpointId, long checkpointTimestamp) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(stateMap.size() * 16);
- DataOutputView out = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
- try {
- out.writeInt(stateMap.size());
- for (Map.Entry<K, V> kv : stateMap.entrySet()) {
- keySerializer.serialize(kv.getKey(), out);
- valueSerializer.serialize(kv.getValue(), out);
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to write snapshot", e);
- }
- return bos.toByteArray();
- }
-
- @Override
- public HashMap<K, V> restoreState(byte[] stateSnapshot) {
- ByteArrayInputView in = new ByteArrayInputView(stateSnapshot);
-
- HashMap<K, V> returnMap = new HashMap<>();
- try {
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- returnMap.put(keySerializer.deserialize(in), valueSerializer.deserialize(in));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read snapshot", e);
- }
-
- return returnMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
new file mode 100644
index 0000000..9c628f8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.state.OperatorState;
+
+/**
+ * Key/Value state implementation for user-defined state. The state is backed by a state
+ * backend, which typically follows one of the following patterns: Either the state is stored
+ * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
+ * state backend into some store (during checkpoints), or the key/value state is in fact backed
+ * by an external key/value store as the state backend, and checkpoints merely record the
+ * metadata of what is considered part of the checkpoint.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
+
+ /**
+ * Sets the current key, which will be used to retrieve values for the next calls to
+ * {@link #value()} and {@link #update(Object)}.
+ *
+ * @param key The key.
+ */
+ void setCurrentKey(K key);
+
+ /**
+ * Creates a snapshot of this state.
+ *
+ * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
+ * @param timestamp The timestamp of the checkpoint.
+ * @return A snapshot handle for this key/value state.
+ *
+ * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
+ * can react to failed snapshots.
+ */
+ KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception;
+
+ /**
+ * Gets the number of key/value pairs currently stored in the state. Note that is a key
+ * has been associated with "null", the key is removed from the state an will not
+ * be counted here.
+ *
+ * @return The number of key/value pairs currently stored in the state.
+ */
+ int size();
+
+ /**
+ * Disposes the key/value state, releasing all occupied resources.
+ */
+ void dispose();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
new file mode 100644
index 0000000..6aa7a1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
+ * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
+ * on the actual implementation. This snapshot defines merely how to restore the state and
+ * how to discard the state.
+ *
+ * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
+ *
+ * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
+ * a file and this snapshot object contains a pointer to that file.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ * @param <Backend> The type of the backend that can restore the state from this snapshot.
+ */
+public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
+
+ /**
+ * Loads the key/value state back from this snapshot.
+ *
+ *
+ * @param stateBackend The state backend that created this snapshot and can restore the key/value state
+ * from this snapshot.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ * @param classLoader The class loader for user-defined types.
+ *
+ * @return An instance of the key/value state loaded from this snapshot.
+ *
+ * @throws Exception Exceptions can occur during the state loading and are forwarded.
+ */
+ KvState<K, V, Backend> restoreState(
+ Backend stateBackend,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer,
+ V defaultValue,
+ ClassLoader classLoader) throws Exception;
+
+
+ /**
+ * Discards the state snapshot, removing any resources occupied by it.
+ *
+ * @throws Exception Exceptions occurring during the state disposal should be forwarded.
+ */
+ void discardState() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
deleted file mode 100644
index 0c0b2c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-public class OperatorStateHandle implements StateHandle<Serializable> {
-
- private static final long serialVersionUID = 1L;
-
- private final StateHandle<Serializable> handle;
- private final boolean isPartitioned;
-
- public OperatorStateHandle(StateHandle<Serializable> handle, boolean isPartitioned){
- this.handle = handle;
- this.isPartitioned = isPartitioned;
- }
-
- public boolean isPartitioned(){
- return isPartitioned;
- }
-
- @Override
- public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
- return handle.getState(userCodeClassLoader);
- }
-
- @Override
- public void discardState() throws Exception {
- handle.discardState();
- }
-
- public StateHandle<Serializable> getHandle() {
- return handle;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
deleted file mode 100644
index 34bfde7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * Interface for storing and accessing partitioned state. The interface is
- * designed in a way that allows implementations for lazily state access.
- *
- * @param <S>
- * Type of the state.
- * @param <C>
- * Type of the state snapshot.
- */
-public interface PartitionedStateStore<S, C extends Serializable> {
-
- S getStateForKey(Serializable key) throws IOException;
-
- void setStateForKey(Serializable key, S state);
-
- void removeStateForKey(Serializable key);
-
- Map<Serializable, S> getPartitionedState() throws IOException;
-
- StateHandle<Serializable> snapshotStates(long checkpointId, long checkpointTimestamp) throws IOException;
-
- void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception;
-
- boolean containsKey(Serializable key);
-
- void setCheckPointer(StateCheckpointer<S, C> checkpointer);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
deleted file mode 100644
index 408a0f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Implementation of the {@link OperatorState} interface for partitioned user
- * states. It provides methods for checkpointing and restoring partitioned
- * operator states upon failure.
- *
- * @param <IN>
- * Input type of the underlying {@link OneInputStreamOperator}
- * @param <S>
- * Type of the underlying {@link OperatorState}.
- * @param <C>
- * Type of the state snapshot.
- */
-public class PartitionedStreamOperatorState<IN, S, C extends Serializable> extends StreamOperatorState<S, C> {
-
- // KeySelector for getting the state partition key for each input
- private final KeySelector<IN, Serializable> keySelector;
-
- private final PartitionedStateStore<S, C> stateStore;
-
- private byte[] defaultState;
-
- // The currently processed input, used to extract the appropriate key
- private IN currentInput;
-
- private ClassLoader cl;
- private boolean restored = true;
- private StateHandle<Serializable> checkpoint = null;
-
- public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
- StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
- super(checkpointer, provider);
- this.keySelector = keySelector;
- this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
- this.cl = cl;
- }
-
- @SuppressWarnings("unchecked")
- public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
- KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
- this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector, cl);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public S value() throws IOException {
- if (currentInput == null) {
- throw new IllegalStateException("Need a valid input for accessing the state.");
- } else {
- if (!restored) {
- // If the state is not restored yet, restore now
- restoreWithCheckpointer();
- }
- Serializable key;
- try {
- key = keySelector.getKey(currentInput);
- } catch (Exception e) {
- throw new RuntimeException("User-defined key selector threw an exception.", e);
- }
- if (stateStore.containsKey(key)) {
- return stateStore.getStateForKey(key);
- } else {
- try {
- return (S) checkpointer.restoreState((C) InstantiationUtil.deserializeObject(
- defaultState, cl));
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not deserialize default state value.", e);
- }
- }
- }
- }
-
- @Override
- public void update(S state) throws IOException {
- if (currentInput == null) {
- throw new IllegalStateException("Need a valid input for updating a state.");
- } else {
- if (!restored) {
- // If the state is not restored yet, restore now
- restoreWithCheckpointer();
- }
- Serializable key;
- try {
- key = keySelector.getKey(currentInput);
- } catch (Exception e) {
- throw new RuntimeException("User-defined key selector threw an exception.");
- }
-
- if (state == null) {
- // Remove state if set to null
- stateStore.removeStateForKey(key);
- } else {
- stateStore.setStateForKey(key, state);
- }
- }
- }
-
- @Override
- public void setDefaultState(S defaultState) {
- try {
- this.defaultState = InstantiationUtil.serializeObject(checkpointer.snapshotState(defaultState, 0, 0));
- } catch (IOException e) {
- throw new RuntimeException("Default state must be serializable.");
- }
- }
-
- public void setCurrentInput(IN input) {
- currentInput = input;
- }
-
- @Override
- public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- // If the state is restored we take a snapshot, otherwise return the last checkpoint
- return restored ? stateStore.snapshotStates(checkpointId, checkpointTimestamp) : provider
- .createStateHandle(checkpoint.getState(cl));
- }
-
- @Override
- public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
- // We store the snapshot for lazy restore
- checkpoint = snapshot;
- restored = false;
- }
-
- private void restoreWithCheckpointer() throws IOException {
- try {
- stateStore.restoreStates(checkpoint, cl);
- } catch (Exception e) {
- throw new IOException(e);
- }
- restored = true;
- checkpoint = null;
- }
-
- @Override
- public Map<Serializable, S> getPartitionedState() throws Exception {
- return stateStore.getPartitionedState();
- }
-
- @Override
- public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
- super.setCheckpointer(checkpointer);
- stateStore.setCheckPointer(checkpointer);
- }
-
- @Override
- public String toString() {
- return stateStore.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
new file mode 100644
index 0000000..b4fce7e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A state backend defines how state is stored and snapshotted during checkpoints.
+ *
+ * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
+ * type of backend when creating state backed by this backend.
+ */
+public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
+
+ private static final long serialVersionUID = 4620413814639220247L;
+
+ // ------------------------------------------------------------------------
+ // initialization and cleanup
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method is called by the task upon deployment to initialize the state backend for
+ * data for a specific job.
+ *
+ * @param job The ID of the job for which the state backend instance checkpoints data.
+ * @throws Exception Overwritten versions of this method may throw exceptions, in which
+ * case the job that uses the state backend is considered failed during
+ * deployment.
+ */
+ public abstract void initializeForJob(JobID job) throws Exception;
+
+ /**
+ * Disposes all state associated with the current job.
+ *
+ * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
+ */
+ public abstract void disposeAllStateForCurrentJob() throws Exception;
+
+ // ------------------------------------------------------------------------
+ // key/value state
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a key/value state backed by this state backend.
+ *
+ * @param keySerializer The serializer for the key.
+ * @param valueSerializer The serializer for the value.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ *
+ * @return A new key/value state backed by this backend.
+ *
+ * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+ */
+ public abstract <K, V> KvState<K, V, Backend> createKvState(
+ TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+ V defaultValue) throws Exception;
+
+
+ // ------------------------------------------------------------------------
+ // storing state for a checkpoint
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an output stream that writes into the state of the given checkpoint. When the stream
+ * is closes, it returns a state handle that can retrieve the state back.
+ *
+ * @param checkpointID The ID of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ * @return An output stream that writes state for the given checkpoint.
+ *
+ * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+ */
+ public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
+ long checkpointID, long timestamp) throws Exception;
+
+
+ /**
+ * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
+ *
+ * @param state The state to be checkpointed.
+ * @param checkpointID The ID of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ * @param <S> The type of the state.
+ *
+ * @return A state handle that can retrieve the checkpoined state.
+ *
+ * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
+ */
+ public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+ S state, long checkpointID, long timestamp) throws Exception;
+
+
+ // ------------------------------------------------------------------------
+ // Checkpoint state output stream
+ // ------------------------------------------------------------------------
+
+ /**
+ * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+ */
+ public static abstract class CheckpointStateOutputStream extends OutputStream {
+
+ /**
+ * Closes the stream and gets a state handle that can create an input stream
+ * producing the data written to this stream.
+ *
+ * @return A state handle that can create an input stream producing the data written to this stream.
+ * @throws IOException Thrown, if the stream cannot be closed.
+ */
+ public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
new file mode 100644
index 0000000..ad87eae
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A factory to create a specific state backend. The state backend creation gets a Configuration
+ * object that can be used to read further config values.
+ *
+ * @param <T> The type of the state backend created.
+ */
+public interface StateBackendFactory<T extends StateBackend<T>> {
+
+ /**
+ * Creates the state backend, optionally using the given configuration.
+ *
+ * @param config The Flink configuration (loaded by the TaskManager).
+ * @return The created state backend.
+ *
+ * @throws Exception Exceptions during instantiation can be forwarded.
+ */
+ StateBackend<T> createFromConfig(Configuration config) throws Exception;
+}