You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:36 UTC
[09/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index cf3dcfc..9718b72 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -28,9 +28,8 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
-import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -63,69 +62,71 @@ public class MockRuntimeContext implements RuntimeContext {
@Override
public ExecutionConfig getExecutionConfig() {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public ClassLoader getUserCodeClassLoader() {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
- public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+ public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
+ throw new UnsupportedOperationException();
+ }
@Override
public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public Map<String, Accumulator<?, ?>> getAllAccumulators() {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public IntCounter getIntCounter(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public LongCounter getLongCounter(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public DoubleCounter getDoubleCounter(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public Histogram getHistogram(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public <RT> List<RT> getBroadcastVariable(String name) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public DistributedCache getDistributedCache() {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
- public <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
- return null;
+ public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ throw new UnsupportedOperationException();
}
@Override
- public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned) throws IOException {
- return null;
+ public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index 00c6f80..7034b11 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.collector.selector;
import java.util.ArrayList;
-import java.util.List;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -27,17 +26,16 @@ import org.apache.flink.util.Collector;
public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
private static final long serialVersionUID = 1L;
- private List<Collector<StreamRecord<OUT>>> outputs;
+
+ private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
public BroadcastOutputSelectorWrapper() {
outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
}
-
- @SuppressWarnings("unchecked,rawtypes")
+
@Override
- public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
- Collector output1 = output;
- outputs.add((Collector<StreamRecord<OUT>>) output1);
+ public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
+ outputs.add(output);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index c6e3388..84558fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -17,16 +17,16 @@
package org.apache.flink.streaming.api.collector.selector;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,32 +38,31 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
private List<OutputSelector<OUT>> outputSelectors;
- private Map<String, List<Collector<StreamRecord<OUT>>>> outputMap;
- private Set<Collector<StreamRecord<OUT>>> selectAllOutputs;
+ private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
+ private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
this.outputSelectors = outputSelectors;
- this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>(); //new LinkedList<Collector<OUT>>();
- this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
+ this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
+ this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
}
-
- @SuppressWarnings("unchecked,rawtypes")
+
@Override
- public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
- Collector output1 = output;
+ public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
List<String> selectedNames = edge.getSelectedNames();
if (selectedNames.isEmpty()) {
- selectAllOutputs.add((Collector<StreamRecord<OUT>>) output1);
- } else {
+ selectAllOutputs.add(output);
+ }
+ else {
for (String selectedName : selectedNames) {
-
if (!outputMap.containsKey(selectedName)) {
- outputMap.put(selectedName, new LinkedList<Collector<StreamRecord<OUT>>>());
- outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
- } else {
+ outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
+ outputMap.get(selectedName).add(output);
+ }
+ else {
if (!outputMap.get(selectedName).contains(output)) {
- outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
+ outputMap.get(selectedName).add(output);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index 9133ac0..f25c995 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.Collector;
public interface OutputSelectorWrapper<OUT> extends Serializable {
- public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge);
+ public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 00991a7..7e686c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,10 +17,10 @@
package org.apache.flink.streaming.api.datastream;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -172,8 +172,9 @@ public class DataStream<T> {
* The DataStreams to union output with.
* @return The {@link DataStream}.
*/
- public DataStream<T> union(DataStream<T>... streams) {
- List<StreamTransformation<T>> unionedTransforms = Lists.newArrayList();
+ @SafeVarargs
+ public final DataStream<T> union(DataStream<T>... streams) {
+ List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
unionedTransforms.add(this.transformation);
Collection<StreamTransformation<?>> thisPredecessors = this.getTransformation().getTransitivePredecessors();
@@ -185,6 +186,11 @@ public class DataStream<T> {
"This Stream: " + this.getTransformation() +
", other stream: " + newStream.getTransformation());
}
+ if (!getType().equals(newStream.getType())) {
+ throw new IllegalArgumentException("Cannot union streams of different types: "
+ + getType() + " and " + newStream.getType());
+ }
+
Collection<StreamTransformation<?>> predecessors = newStream.getTransformation().getTransitivePredecessors();
if (predecessors.contains(this.transformation) || thisPredecessors.contains(newStream.getTransformation())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index fdf398c..24104ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
@@ -76,7 +76,7 @@ public class DataStreamSink<T> {
* @return The sink with chaining disabled
*/
public DataStreamSink<T> disableChaining() {
- this.transformation.setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER);
+ this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 0da419c..cdea910 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -63,6 +63,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
protected final KeySelector<T, KEY> keySelector;
+ protected final TypeInformation<KEY> keyType;
+
/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector}
* to partition operator state by key.
@@ -73,8 +75,23 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* Function for determining state partitions
*/
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+ this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
+ }
+
+ /**
+ * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+ * to partition operator state by key.
+ *
+ * @param dataStream
+ * Base stream of data
+ * @param keySelector
+ * Function for determining state partitions
+ */
+ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
+ super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
+ dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
+ this.keyType = keyType;
}
@@ -95,7 +112,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
- ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
+ // inject the key selector and key type
+ OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
+ transform.setStateKeySelector(keySelector);
+ transform.setStateKeyType(keyType);
+
return returnStream;
}
@@ -105,6 +126,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
DataStreamSink<T> result = super.addSink(sinkFunction);
result.getTransformation().setStateKeySelector(keySelector);
+ result.getTransformation().setStateKeyType(keyType);
return result;
}
@@ -197,7 +219,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*/
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
- clean(reducer), keySelector, getType()));
+ clean(reducer), getType().createSerializer(getExecutionConfig())));
}
/**
@@ -215,11 +237,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
- Utils.getCallLocationName(), true);
+ TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
+ clean(folder), getType(), Utils.getCallLocationName(), true);
- return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
- keySelector, initialValue, getType()));
+ return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
}
/**
@@ -454,7 +475,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
- StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector, getType());
+ StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
+ clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 016cf5e..33d5a3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -141,7 +140,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
* @return The operator with chaining disabled
*/
public SingleOutputStreamOperator<T, O> disableChaining() {
- return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER);
+ return setChainingStrategy(ChainingStrategy.NEVER);
}
/**
@@ -152,7 +151,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
* @return The operator with chaining set.
*/
public SingleOutputStreamOperator<T, O> startNewChain() {
- return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.HEAD);
+ return setChainingStrategy(ChainingStrategy.HEAD);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 28410fd..26e1c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -45,8 +45,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -65,6 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.state.StateBackend;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.SplittableIterator;
@@ -77,7 +76,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
/**
* An ExecutionEnvironment for streaming jobs. An instance of it is
@@ -127,8 +127,9 @@ public abstract class StreamExecutionEnvironment {
protected CheckpointingMode checkpointingMode;
protected boolean forceCheckpointing = false;
-
- protected StateHandleProvider<?> stateHandleProvider;
+
+ /** The state backend used for storing k/v state and state snapshots */
+ private StateBackend<?> defaultStateBackend;
/** The time characteristic used by the data streams */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
@@ -155,8 +156,7 @@ public abstract class StreamExecutionEnvironment {
* program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup.
*
- * @param parallelism
- * The parallelism
+ * @param parallelism The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
if (parallelism < 1) {
@@ -365,27 +365,40 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the {@link StateHandleProvider} used for storing operator state
- * checkpoints when checkpointing is enabled.
- * <p>
- * An example would be using a {@link FileStateHandle#createProvider(String)}
- * to use any Flink supported file system as a state backend
+ * Sets the state backend that describes how to store and checkpoint operator state. It defines in
+ * what form the key/value state ({@link org.apache.flink.api.common.state.OperatorState}, accessible
+ * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
+ * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
+ * the key/value state, and for checkpointed functions (implementing the interface
+ * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+ *
+ * <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example
+ * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
+ * but can checkpoint only small states (some counters).
*
+ * <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+ * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
+ * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
+ * failures of individual nodes and that streaming program can be executed highly available and strongly
+ * consistent (assuming that Flink is run in high-availability mode).
+ *
+ * @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
+ *
+ * @see #getStateBackend()
*/
- public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
- this.stateHandleProvider = provider;
+ public StreamExecutionEnvironment setStateBackend(StateBackend<?> backend) {
+ this.defaultStateBackend = requireNonNull(backend);
return this;
}
/**
- * Returns the {@link org.apache.flink.runtime.state.StateHandle}
- *
- * @see #setStateHandleProvider(org.apache.flink.runtime.state.StateHandleProvider)
- *
- * @return The StateHandleProvider
+ * Returns the state backend that defines how to store and checkpoint state.
+ * @return The state backend that defines how to store and checkpoint state.
+ *
+ * @see #setStateBackend(StateBackend)
*/
- public StateHandleProvider<?> getStateHandleProvider() {
- return stateHandleProvider;
+ public StateBackend<?> getStateBackend() {
+ return defaultStateBackend;
}
/**
@@ -395,8 +408,7 @@ public abstract class StreamExecutionEnvironment {
* should be used.
*
* @param numberOfExecutionRetries
- * The number of times the system will try to re-execute failed
- * tasks.
+ * The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
@@ -423,7 +435,7 @@ public abstract class StreamExecutionEnvironment {
* The delay of time the system will wait to re-execute failed
* tasks.
*/
- public void setExecutionRetryDelay(long executionRetryDelay){
+ public void setExecutionRetryDelay(long executionRetryDelay) {
config.setExecutionRetryDelay(executionRetryDelay);
}
@@ -434,7 +446,7 @@ public abstract class StreamExecutionEnvironment {
*
* @return The delay time the system will wait to re-execute failed tasks.
*/
- public long getExecutionRetryDelay(){
+ public long getExecutionRetryDelay() {
return config.getExecutionRetryDelay();
}
/**
@@ -550,7 +562,7 @@ public abstract class StreamExecutionEnvironment {
* @param characteristic The time characteristic.
*/
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
- this.timeCharacteristic = Objects.requireNonNull(characteristic);
+ this.timeCharacteristic = requireNonNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().disableTimestamps();
getConfig().setAutoWatermarkInterval(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 1cf5c07..504bc39 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.functions.sink;
import java.io.IOException;
import java.util.ArrayList;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +38,11 @@ import org.slf4j.LoggerFactory;
* Input type
*/
public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+
private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
+
protected ArrayList<IN> tupleList = new ArrayList<IN>();
protected volatile OutputFormat<IN> format;
protected volatile boolean cleanupCalled = false;
@@ -51,8 +55,8 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
@Override
public void open(Configuration parameters) throws Exception {
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- format.configure(context.getTaskStubParameters());
+ RuntimeContext context = getRuntimeContext();
+ format.configure(parameters);
indexInSubtaskGroup = context.getIndexOfThisSubtask();
currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
index 5a9c7a8..93a91cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.functions.sink;
import java.io.PrintStream;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
/**
* Implementation of the SinkFunction writing every tuple to the standard
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index 253c076..cc3925c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -52,7 +52,8 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
this.provider = context.getInputSplitProvider();
- format.configure(context.getTaskStubParameters());
+
+ format.configure(parameters);
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
splitIterator = getInputSplits();
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 2d74e38..14badf1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -17,24 +17,21 @@
*/
package org.apache.flink.streaming.api.functions.source;
-
-import java.io.IOException;
-
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
/**
* A stateful streaming source that emits each number from a given interval exactly once,
* possibly in parallel.
*/
-public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
+public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
+
private static final long serialVersionUID = 1L;
private final long start;
private final long end;
- private OperatorState<Long> collected;
+ private long collected;
private volatile boolean isRunning = true;
@@ -62,25 +59,28 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
((end - start + 1) % stepSize > (congruence - start)) ?
((end - start + 1) / stepSize + 1) :
((end - start + 1) / stepSize);
-
- Long currentCollected = collected.value();
+
- while (isRunning && currentCollected < toCollect) {
+ while (isRunning && collected < toCollect) {
synchronized (checkpointLock) {
- ctx.collect(currentCollected * stepSize + congruence);
- collected.update(currentCollected + 1);
+ ctx.collect(collected * stepSize + congruence);
+ collected++;
}
- currentCollected = collected.value();
}
}
-
- @Override
- public void open(Configuration conf) throws IOException{
- collected = getRuntimeContext().getOperatorState("collected", 0L, false);
- }
@Override
public void cancel() {
isRunning = false;
}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return collected;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ collected = state;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2c422d9..55afc93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -27,11 +27,11 @@ import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil;
@@ -48,9 +48,7 @@ public class StreamConfig implements Serializable {
private static final String CHAINED_OUTPUTS = "chainedOutputs";
private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
- private static final String OUTPUT_NAME = "outputName_";
private static final String VERTEX_NAME = "vertexID";
- private static final String OPERATOR_NAME = "operatorName";
private static final String ITERATION_ID = "iterationId";
private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
private static final String SERIALIZEDUDF = "serializedUDF";
@@ -58,8 +56,7 @@ public class StreamConfig implements Serializable {
private static final String BUFFER_TIMEOUT = "bufferTimeout";
private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
- private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
- private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+ private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
private static final String ITERATON_WAIT = "iterationWait";
private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
private static final String EDGES_IN_ORDER = "edgesInOrder";
@@ -67,10 +64,12 @@ public class StreamConfig implements Serializable {
private static final String IN_STREAM_EDGES = "inStreamEdges";
private static final String CHECKPOINTING_ENABLED = "checkpointing";
- private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
- private static final String STATE_PARTITIONER = "statePartitioner";
private static final String CHECKPOINT_MODE = "checkpointMode";
+ private static final String STATE_BACKEND = "statebackend";
+ private static final String STATE_PARTITIONER = "statePartitioner";
+ private static final String STATE_KEY_SERIALIZER = "statekeyser";
+
// ------------------------------------------------------------------------
// Default Values
@@ -97,7 +96,6 @@ public class StreamConfig implements Serializable {
// ------------------------------------------------------------------------
// Configured Properties
// ------------------------------------------------------------------------
-
public void setVertexID(Integer vertexID) {
config.setInteger(VERTEX_NAME, vertexID);
@@ -106,15 +104,7 @@ public class StreamConfig implements Serializable {
public Integer getVertexID() {
return config.getInteger(VERTEX_NAME, -1);
}
-
- public void setOperatorName(String name) {
- config.setString(OPERATOR_NAME, name);
- }
-
- public String getOperatorName() {
- return config.getString(OPERATOR_NAME, "Missing");
- }
-
+
public void setTypeSerializerIn1(TypeSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
}
@@ -123,49 +113,29 @@ public class StreamConfig implements Serializable {
setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
}
- public void setTypeSerializerOut1(TypeSerializer<?> serializer) {
+ public void setTypeSerializerOut(TypeSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
}
-
- public void setTypeSerializerOut2(TypeSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
- }
-
- @SuppressWarnings("unchecked")
+
public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
try {
- return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_IN_1, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_1, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
try {
- return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_IN_2, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate serializer.", e);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> TypeSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
- try {
- return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_OUT_1, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_2, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
}
-
- @SuppressWarnings("unchecked")
- public <T> TypeSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+
+ public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
try {
- return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_OUT_2, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
@@ -202,9 +172,7 @@ public class StreamConfig implements Serializable {
public <T> T getStreamOperator(ClassLoader cl) {
try {
- @SuppressWarnings("unchecked")
- T result = (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
- return result;
+ return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
}
catch (ClassNotFoundException e) {
String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
@@ -230,12 +198,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
try {
- return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
- OUTPUT_SELECTOR_WRAPPER, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
} catch (Exception e) {
throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e);
}
@@ -280,11 +246,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Cannot serialize non chained outputs.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
try {
- List<StreamEdge> nonChainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
+ List<StreamEdge> nonChainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
return nonChainedOutputs == null ? new ArrayList<StreamEdge>() : nonChainedOutputs;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate non chained outputs.", e);
@@ -298,11 +263,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Cannot serialize chained outputs.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
try {
- List<StreamEdge> chainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
+ List<StreamEdge> chainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
return chainedOutputs == null ? new ArrayList<StreamEdge>() : chainedOutputs;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate chained outputs.", e);
@@ -316,12 +280,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Cannot serialize outward edges.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public List<StreamEdge> getOutEdges(ClassLoader cl) {
try {
- List<StreamEdge> outEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, OUT_STREAM_EDGES, cl);
+ List<StreamEdge> outEdges = InstantiationUtil.readObjectFromConfig(this.config, OUT_STREAM_EDGES, cl);
return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate outputs.", e);
@@ -335,12 +297,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Cannot serialize inward edges.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
try {
- List<StreamEdge> inEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, IN_STREAM_EDGES, cl);
+ List<StreamEdge> inEdges = InstantiationUtil.readObjectFromConfig(this.config, IN_STREAM_EDGES, cl);
return inEdges == null ? new ArrayList<StreamEdge>() : inEdges;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate inputs.", e);
@@ -378,12 +338,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not serialize outputs in order.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
try {
- List<StreamEdge> outEdgesInOrder = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, EDGES_IN_ORDER, cl);
+ List<StreamEdge> outEdgesInOrder = InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate outputs in order.", e);
@@ -398,31 +356,31 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not serialize configuration.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
try {
- Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
- .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
+ Map<Integer, StreamConfig> confs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate configuration.", e);
}
}
- public void setStateHandleProvider(StateHandleProvider<?> provider) {
+ // ------------------------------------------------------------------------
+ // State backend
+ // ------------------------------------------------------------------------
+
+ public void setStateBackend(StateBackend<?> backend) {
try {
- InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
- } catch (IOException e) {
+ InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
+ } catch (Exception e) {
throw new StreamTaskException("Could not serialize stateHandle provider.", e);
}
}
-
- @SuppressWarnings("unchecked")
- public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
+
+ public StateBackend<?> getStateBackend(ClassLoader cl) {
try {
- return (StateHandleProvider<R>) InstantiationUtil
- .readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate statehandle provider.", e);
}
@@ -435,17 +393,35 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not serialize state partitioner.", e);
}
}
-
- @SuppressWarnings("unchecked")
+
public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
try {
- return (KeySelector<?, Serializable>) InstantiationUtil
- .readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate state partitioner.", e);
}
}
+
+ public void setStateKeySerializer(TypeSerializer<?> serializer) {
+ try {
+ InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER);
+ } catch (IOException e) {
+ throw new StreamTaskException("Could not serialize state key serializer.", e);
+ }
+ }
+ public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
+ try {
+ return InstantiationUtil.readObjectFromConfig(this.config, STATE_KEY_SERIALIZER, cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Miscellansous
+ // ------------------------------------------------------------------------
+
public void setChainStart() {
config.setBoolean(IS_CHAINED_VERTEX, true);
}
@@ -463,7 +439,6 @@ public class StreamConfig implements Serializable {
builder.append("\n=======================");
builder.append("Stream Config");
builder.append("=======================");
- builder.append("\nTask name: ").append(getVertexID());
builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2ca82b1..4c5c19c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -43,7 +43,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -60,9 +60,12 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.sling.commons.json.JSONException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
+
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
@@ -93,7 +96,7 @@ public class StreamGraph extends StreamingPlan {
protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
- private StateHandleProvider<?> stateHandleProvider;
+ private StateBackend<?> stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
private boolean forceCheckpoint = false;
@@ -145,12 +148,12 @@ public class StreamGraph extends StreamingPlan {
this.forceCheckpoint = true;
}
- public void setStateHandleProvider(StateHandleProvider<?> provider) {
- this.stateHandleProvider = provider;
+ public void setStateBackend(StateBackend<?> backend) {
+ this.stateBackend = requireNonNull(backend);
}
- public StateHandleProvider<?> getStateHandleProvider() {
- return this.stateHandleProvider;
+ public StateBackend<?> getStateBackend() {
+ return this.stateBackend;
}
public long getCheckpointingInterval() {
@@ -392,8 +395,10 @@ public class StreamGraph extends StreamingPlan {
}
}
- public void setKey(Integer vertexID, KeySelector<?, ?> key) {
- getStreamNode(vertexID).setStatePartitioner(key);
+ public void setKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
+ StreamNode node = getStreamNode(vertexID);
+ node.setStatePartitioner(keySelector);
+ node.setStateKeySerializer(keySerializer);
}
public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 774c00b..4a87eb3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -17,8 +17,7 @@
*/
package org.apache.flink.streaming.api.graph;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
@@ -36,8 +35,10 @@ import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -97,17 +98,19 @@ public class StreamGraphGenerator {
private StreamGraphGenerator(StreamExecutionEnvironment env) {
this.streamGraph = new StreamGraph(env);
this.streamGraph.setChaining(env.isChainingEnabled());
+
if (env.getCheckpointInterval() > 0) {
this.streamGraph.setCheckpointingEnabled(true);
this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
}
- this.streamGraph.setStateHandleProvider(env.getStateHandleProvider());
+ this.streamGraph.setStateBackend(env.getStateBackend());
if (env.isForceCheckpointing()) {
this.streamGraph.forceCheckpoint();
}
+
this.env = env;
- this.alreadyTransformed = Maps.newHashMap();
+ this.alreadyTransformed = new HashMap<>();
}
/**
@@ -202,7 +205,7 @@ public class StreamGraphGenerator {
*/
private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
List<StreamTransformation<T>> inputs = union.getInputs();
- List<Integer> resultIds = Lists.newArrayList();
+ List<Integer> resultIds = new ArrayList<>();
for (StreamTransformation<T> input: inputs) {
resultIds.addAll(transform(input));
@@ -220,7 +223,7 @@ public class StreamGraphGenerator {
*/
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
StreamTransformation<T> input = partition.getInput();
- List<Integer> resultIds = Lists.newArrayList();
+ List<Integer> resultIds = new ArrayList<>();
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
@@ -273,7 +276,7 @@ public class StreamGraphGenerator {
return alreadyTransformed.get(select);
}
- List<Integer> virtualResultIds = Lists.newArrayList();
+ List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = StreamTransformation.getNewNodeId();
@@ -301,7 +304,7 @@ public class StreamGraphGenerator {
}
StreamTransformation<T> input = iterate.getInput();
- List<Integer> resultIds = Lists.newArrayList();
+ List<Integer> resultIds = new ArrayList<>();
// first transform the input stream(s) and store the result IDs
resultIds.addAll(transform(input));
@@ -442,7 +445,8 @@ public class StreamGraphGenerator {
if (sink.getStateKeySelector() != null) {
- streamGraph.setKey(sink.getId(), sink.getStateKeySelector());
+ TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig());
+ streamGraph.setKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
}
return Collections.emptyList();
@@ -471,7 +475,11 @@ public class StreamGraphGenerator {
transform.getName());
if (transform.getStateKeySelector() != null) {
- streamGraph.setKey(transform.getId(), transform.getStateKeySelector());
+ TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
+ streamGraph.setKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
+ }
+ if (transform.getStateKeyType() != null) {
+
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 9110cd3..608e648 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -50,6 +50,7 @@ public class StreamNode implements Serializable {
private Integer slotSharingID;
private boolean isolatedSlot = false;
private KeySelector<?,?> statePartitioner;
+ private TypeSerializer<?> stateKeySerializer;
private transient StreamOperator<?> operator;
private List<OutputSelector<?>> outputSelectors;
@@ -233,6 +234,14 @@ public class StreamNode implements Serializable {
this.statePartitioner = statePartitioner;
}
+ public TypeSerializer<?> getStateKeySerializer() {
+ return stateKeySerializer;
+ }
+
+ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
+ this.stateKeySerializer = stateKeySerializer;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d8e81cf..45cfff1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
@@ -211,13 +211,9 @@ public class StreamingJobGraphGenerator {
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
}
- String returnOperatorName = operatorName + " -> ("
- + StringUtils.join(outputChainedNames, ", ") + ")";
- return returnOperatorName;
+ return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
} else if (chainedOutputs.size() == 1) {
- String returnOperatorName = operatorName + " -> "
- + chainedNames.get(chainedOutputs.get(0).getTargetId());
- return returnOperatorName;
+ return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
} else {
return operatorName;
}
@@ -249,9 +245,7 @@ public class StreamingJobGraphGenerator {
builtVertices.add(vertexID);
jobGraph.addVertex(jobVertex);
- StreamConfig retConfig = new StreamConfig(jobVertex.getConfiguration());
- retConfig.setOperatorName(chainedNames.get(vertexID));
- return retConfig;
+ return new StreamConfig(jobVertex.getConfiguration());
}
@SuppressWarnings("unchecked")
@@ -265,7 +259,7 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
- config.setTypeSerializerOut1(vertex.getTypeSerializerOut());
+ config.setTypeSerializerOut(vertex.getTypeSerializerOut());
config.setStreamOperator(vertex.getOperator());
config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
@@ -277,13 +271,14 @@ public class StreamingJobGraphGenerator {
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
if (streamGraph.isCheckpointingEnabled()) {
config.setCheckpointMode(streamGraph.getCheckpointingMode());
- config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+ config.setStateBackend(streamGraph.getStateBackend());
} else {
// the at least once input handler is slightly cheaper (in the absence of checkpoints),
// so we use that one if checkpointing is not enabled
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
}
config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
+ config.setStateKeySerializer(vertex.getStateKeySerializer());
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 87041eb..e99d54d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,61 +19,302 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.state.KvState;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Base class for all stream operators.
- *
- * Operators that contain a user function should extend the class
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
+ * Base class for all stream operators. Operators that contain a user function should extend the class
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
*
+ * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
+ * mark the operator as unary or binary:
+ * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}.
+ *
+ * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
+ * the timer service, timer callbacks are also guaranteed not to be called concurrently with
+ * methods on {@code StreamOperator}.
+ *
* @param <OUT> The output type of the operator
*/
-public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT> {
+public abstract class AbstractStreamOperator<OUT>
+ implements StreamOperator<OUT>, java.io.Serializable {
private static final long serialVersionUID = 1L;
+
+ /** The logger used by the operator class and its subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
- protected transient StreamingRuntimeContext runtimeContext;
+ // ----------- configuration properties -------------
- protected transient ExecutionConfig executionConfig;
+ // A sane default for most operators
+ protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+
+ private boolean inputCopyDisabled = false;
+
+ // ---------------- runtime fields ------------------
- protected transient Output<StreamRecord<OUT>> output;
+ /** The task that contains this operator (and other operators in the same chain) */
+ private transient StreamTask<?, ?> container;
+
+ private transient StreamConfig config;
- protected boolean inputCopyDisabled = false;
+ protected transient Output<StreamRecord<OUT>> output;
- // A sane default for most operators
- protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+ /** The runtime context for UDFs */
+ private transient StreamingRuntimeContext runtimeContext;
+ // ---------------- key/value state ------------------
+
+ /** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
+ private transient KeySelector<?, ?> stateKeySelector;
+
+ private transient KvState<?, ?, ?> keyValueState;
+
+ private transient KvStateSnapshot<?, ?, ?> keyValueStateSnapshot;
+
// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------
-
+
@Override
- public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+ this.container = containingTask;
+ this.config = config;
this.output = output;
- this.executionConfig = runtimeContext.getExecutionConfig();
- this.runtimeContext = runtimeContext;
+ this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
}
/**
- * This default implementation of the interface method does nothing.
+ * This method is called immediately before any elements are processed, it should contain the
+ * operator's initialization logic.
+ *
+ * <p>The default implementation does nothing.
+ *
+ * @throws Exception An exception in this method causes the operator to fail.
*/
@Override
- public void open(Configuration parameters) throws Exception {}
+ public void open() throws Exception {}
/**
- * This default implementation of the interface method does nothing.
+ * This method is called after all records have been added to the operators via the methods
+ * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+
+ * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing
+ * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
+ * because the last data items are not processed properly.
+ *
+ * @throws Exception An exception in this method causes the operator to fail.
*/
@Override
public void close() throws Exception {}
+
+ /**
+ * This method is called at the very end of the operator's life, both in the case of a successful
+ * completion of the operation, and in the case of a failure and canceling.
+ *
+ * This method is expected to make a thorough effort to release all resources
+ * that the operator has acquired.
+ */
+ @Override
+ public void dispose() {
+ if (keyValueState != null) {
+ keyValueState.dispose();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpointing
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ // here, we deal with operator checkpoints and key/value state snapshots
+
+ StreamTaskState state = new StreamTaskState();
+
+ // (1) checkpoint the operator, if the operator is stateful
+
+ // (2) draw a snapshot of the key/value state
+ if (keyValueState != null) {
+ KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
+ state.setKvState(snapshot);
+ }
+
+ return state;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState state) throws Exception {
+ // (1) checkpoint the operator, if the operator is stateful
+
+ // (2) restore the key/value state. the actual restore happens lazily, when the function requests
+ // the state again, because the restore method needs information provided by the user function
+ keyValueStateSnapshot = state.getKvState();
+ }
+
+ @Override
+ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+ // by default, nothing needs a notification of checkpoint completion
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties and Services
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the execution config defined on the execution environment of the job to which this
+ * operator belongs.
+ *
+ * @return The job's execution config.
+ */
+ public ExecutionConfig getExecutionConfig() {
+ return container.getExecutionConfig();
+ }
+
+ public StreamConfig getOperatorConfig() {
+ return config;
+ }
+
+ public StreamTask<?, ?> getContainingTask() {
+ return container;
+ }
+
+ public ClassLoader getUserCodeClassloader() {
+ return container.getUserCodeClassLoader();
+ }
+
+ /**
+ * Returns a context that allows the operator to query information about the execution and also
+ * to interact with systems such as broadcast variables and managed state. This also allows
+ * to register timers.
+ */
+ public StreamingRuntimeContext getRuntimeContext() {
+ return runtimeContext;
+ }
+
+ public StateBackend<?> getStateBackend() {
+ return container.getStateBackend();
+ }
/**
- * This default implementation of the interface method does nothing.
+ * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
+ * This call is guaranteed to not happen concurrently with method calls on the operator.
+ *
+ * @param time The absolute time in milliseconds.
+ * @param target The target to be triggered.
*/
+ protected void registerTimer(long time, Triggerable target) {
+ container.registerTimer(time, target);
+ }
+
+ /**
+ * Creates a key/value state handle, using the state backend configured for this task.
+ *
+ * @param stateType The type information for the state type, used for managed memory and state snapshots.
+ * @param defaultValue The default value that the state should return for keys that currently have
+ * no value associated with them
+ *
+ * @param <V> The type of the state value.
+ *
+ * @return The key/value state for this operator.
+ *
+ * @throws IllegalStateException Thrown, if the key/value state was already initialized.
+ * @throws Exception Thrown, if the state backend cannot create the key/value state.
+ */
+ protected <V> OperatorState<V> createKeyValueState(
+ TypeInformation<V> stateType, V defaultValue) throws Exception
+ {
+ return createKeyValueState(stateType.createSerializer(getExecutionConfig()), defaultValue);
+ }
+
+ /**
+ * Creates a key/value state handle, using the state backend configured for this task.
+ *
+ * @param valueSerializer The type serializer for the state type, used for managed memory and state snapshots.
+ * @param defaultValue The default value that the state should return for keys that currently have
+ * no value associated with them
+ *
+ * @param <K> The type of the state key.
+ * @param <V> The type of the state value.
+ * @param <Backend> The type of the state backend that creates the key/value state.
+ *
+ * @return The key/value state for this operator.
+ *
+ * @throws IllegalStateException Thrown, if the key/value state was already initialized.
+ * @throws Exception Thrown, if the state backend cannot create the key/value state.
+ */
+ protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
+ TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
+ {
+ if (keyValueState != null) {
+ throw new IllegalStateException("The key/value state has already been created");
+ }
+
+ // first time state access, make sure we load the state partitioner
+ if (stateKeySelector == null) {
+ stateKeySelector = config.getStatePartitioner(getUserCodeClassloader());
+ if (stateKeySelector == null) {
+ throw new UnsupportedOperationException("The function or operator is not executed " +
+ "on a KeyedStream and can hence not access the key/value state");
+ }
+ }
+
+ // create the key and value serializers
+ TypeSerializer<K> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
+ if (keySerializer == null) {
+ throw new Exception("State key serializer has not been configured in the config.");
+ }
+
+ @SuppressWarnings("unchecked")
+ Backend stateBackend = (Backend) container.getStateBackend();
+
+ // check whether we restore the key/value state from a snapshot, or create a new blank one
+ if (keyValueStateSnapshot != null) {
+ @SuppressWarnings("unchecked")
+ KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshot;
+
+ KvState<K, V, Backend> kvstate = snapshot.restoreState(
+ stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
+ keyValueState = kvstate;
+
+ // make sure we have no redundant copies in memory, let the GC clean up
+ keyValueStateSnapshot = null;
+
+ return kvstate;
+ }
+ else {
+ // create a new blank key/value state
+ KvState<K, V, Backend> kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
+ keyValueState = kvstate;
+ return kvstate;
+ }
+ }
+
@Override
- public void dispose() {}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void setKeyContextElement(StreamRecord record) throws Exception {
+ if (stateKeySelector != null && keyValueState != null) {
+ KvState kv = keyValueState;
+ KeySelector selector = stateKeySelector;
+ kv.setCurrentKey(selector.getKey(record.getValue()));
+ }
+ }
// ------------------------------------------------------------------------
// Context and chaining properties
@@ -83,12 +324,12 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
public final void setChainingStrategy(ChainingStrategy strategy) {
this.chainingStrategy = strategy;
}
-
+
@Override
public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
-
+
@Override
public boolean isInputCopyingDisabled() {
return inputCopyDisabled;
@@ -96,14 +337,9 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
/**
* Enable object-reuse for this operator instance. This overrides the setting in
- * the {@link org.apache.flink.api.common.ExecutionConfig}/
+ * the {@link org.apache.flink.api.common.ExecutionConfig}
*/
public void disableInputCopy() {
this.inputCopyDisabled = true;
}
-
- @Override
- public StreamingRuntimeContext getRuntimeContext(){
- return runtimeContext;
- }
}