You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:37 UTC
[10/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
[FLINK-2808] [streaming] Refactor and extend state backend abstraction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479bec0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479bec0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479bec0b
Branch: refs/heads/master
Commit: 479bec0b2b44315196c1f1cddeb114c79d1717db
Parents: 5ac2872
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 5 15:57:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200
----------------------------------------------------------------------
.../flink/storm/wrappers/BoltWrapper.java | 12 +-
.../flink/storm/wrappers/SpoutWrapper.java | 2 +-
.../storm/wrappers/WrapperSetupHelper.java | 2 +-
.../storm/api/FlinkTopologyBuilderTest.java | 6 +-
.../flink/storm/wrappers/BoltWrapperTest.java | 161 ++++---
.../flink/storm/wrappers/SpoutWrapperTest.java | 11 +-
.../storm/wrappers/WrapperSetupHelperTest.java | 12 +-
.../common/functions/AbstractRichFunction.java | 5 +-
.../functions/IterationRuntimeContext.java | 2 +-
.../api/common/functions/RichFunction.java | 28 +-
.../api/common/functions/RuntimeContext.java | 163 ++++---
.../util/AbstractRuntimeUDFContext.java | 17 +-
.../common/functions/util/ListCollector.java | 5 +
.../flink/api/common/state/OperatorState.java | 23 +-
.../api/common/state/StateCheckpointer.java | 73 ---
.../flink/configuration/ConfigConstants.java | 5 -
.../memory/InputViewDataInputStreamWrapper.java | 5 +-
.../apache/flink/util/InstantiationUtil.java | 7 +-
.../org/apache/flink/util/SerializedValue.java | 8 +
.../flink/core/testutils/CommonTestUtils.java | 21 +-
flink-dist/src/main/resources/flink-conf.yaml | 15 +-
.../flink/runtime/state/FileStateHandle.java | 31 --
.../flink/runtime/state/LocalStateHandle.java | 15 +-
.../runtime/state/StateHandleProvider.java | 39 --
.../apache/flink/runtime/taskmanager/Task.java | 2 -
flink-staging/flink-fs-tests/pom.xml | 19 +
.../flink/hdfstests/FileStateBackendTest.java | 308 +++++++++++++
.../flink/hdfstests/FileStateHandleTest.java | 126 ------
.../kafka/testutils/MockRuntimeContext.java | 37 +-
.../BroadcastOutputSelectorWrapper.java | 12 +-
.../selector/DirectedOutputSelectorWrapper.java | 33 +-
.../selector/OutputSelectorWrapper.java | 2 +-
.../streaming/api/datastream/DataStream.java | 12 +-
.../api/datastream/DataStreamSink.java | 4 +-
.../streaming/api/datastream/KeyedStream.java | 38 +-
.../datastream/SingleOutputStreamOperator.java | 7 +-
.../environment/StreamExecutionEnvironment.java | 64 +--
.../api/functions/sink/FileSinkFunction.java | 10 +-
.../api/functions/sink/PrintSinkFunction.java | 2 +-
.../functions/source/FileSourceFunction.java | 5 +-
.../source/StatefulSequenceSource.java | 36 +-
.../flink/streaming/api/graph/StreamConfig.java | 149 +++----
.../flink/streaming/api/graph/StreamGraph.java | 21 +-
.../api/graph/StreamGraphGenerator.java | 28 +-
.../flink/streaming/api/graph/StreamNode.java | 9 +
.../api/graph/StreamingJobGraphGenerator.java | 19 +-
.../api/operators/AbstractStreamOperator.java | 296 ++++++++++--
.../operators/AbstractUdfStreamOperator.java | 163 ++++---
.../api/operators/ChainingStrategy.java | 47 ++
.../api/operators/OneInputStreamOperator.java | 4 +-
.../flink/streaming/api/operators/Output.java | 2 +-
.../api/operators/StatefulStreamOperator.java | 40 --
.../streaming/api/operators/StreamFlatMap.java | 5 +-
.../api/operators/StreamGroupedFold.java | 47 +-
.../api/operators/StreamGroupedReduce.java | 54 +--
.../streaming/api/operators/StreamOperator.java | 92 ++--
.../streaming/api/operators/StreamProject.java | 5 +-
.../streaming/api/operators/StreamSource.java | 3 +-
.../api/operators/StreamingRuntimeContext.java | 162 +++++++
.../api/operators/co/CoStreamFlatMap.java | 5 +-
.../api/state/AbstractHeapKvState.java | 145 ++++++
.../streaming/api/state/BasicCheckpointer.java | 37 --
.../streaming/api/state/EagerStateStore.java | 104 -----
.../streaming/api/state/KVMapCheckpointer.java | 82 ----
.../flink/streaming/api/state/KvState.java | 69 +++
.../streaming/api/state/KvStateSnapshot.java | 69 +++
.../api/state/OperatorStateHandle.java | 54 ---
.../api/state/PartitionedStateStore.java | 55 ---
.../state/PartitionedStreamOperatorState.java | 182 --------
.../flink/streaming/api/state/StateBackend.java | 135 ++++++
.../api/state/StateBackendFactory.java | 40 ++
.../api/state/StreamOperatorState.java | 132 ------
.../streaming/api/state/StreamStateHandle.java | 28 ++
.../streaming/api/state/WrapperStateHandle.java | 61 ---
.../api/state/filesystem/AbstractFileState.java | 83 ++++
.../filesystem/FileSerializableStateHandle.java | 53 +++
.../state/filesystem/FileStreamStateHandle.java | 46 ++
.../api/state/filesystem/FsHeapKvState.java | 88 ++++
.../state/filesystem/FsHeapKvStateSnapshot.java | 95 ++++
.../api/state/filesystem/FsStateBackend.java | 409 +++++++++++++++++
.../state/filesystem/FsStateBackendFactory.java | 56 +++
.../api/state/memory/ByteStreamStateHandle.java | 52 +++
.../api/state/memory/MemHeapKvState.java | 52 +++
.../state/memory/MemoryHeapKvStateSnapshot.java | 102 +++++
.../api/state/memory/MemoryStateBackend.java | 206 +++++++++
.../api/state/memory/SerializedStateHandle.java | 49 ++
.../CoFeedbackTransformation.java | 4 +-
.../transformations/FeedbackTransformation.java | 4 +-
.../transformations/OneInputTransformation.java | 18 +-
.../PartitionTransformation.java | 6 +-
.../transformations/SelectTransformation.java | 9 +-
.../api/transformations/SinkTransformation.java | 15 +-
.../transformations/SourceTransformation.java | 4 +-
.../transformations/SplitTransformation.java | 4 +-
.../transformations/StreamTransformation.java | 5 +-
.../transformations/TwoInputTransformation.java | 4 +-
.../transformations/UnionTransformation.java | 4 +-
.../streaming/runtime/io/CollectorWrapper.java | 18 +-
.../runtime/io/StreamInputProcessor.java | 9 +-
.../operators/BucketStreamSortOperator.java | 18 +-
.../operators/ExtractTimestampsOperator.java | 14 +-
...ractAlignedProcessingTimeWindowOperator.java | 9 +-
.../windowing/NonKeyedWindowOperator.java | 28 +-
.../operators/windowing/WindowOperator.java | 29 +-
.../ExceptionInChainedOperatorException.java | 45 ++
.../runtime/tasks/OneInputStreamTask.java | 14 +-
.../streaming/runtime/tasks/OperatorChain.java | 308 +++++++++++++
.../streaming/runtime/tasks/OutputHandler.java | 336 --------------
.../runtime/tasks/SourceStreamTask.java | 12 +-
.../runtime/tasks/StreamIterationHead.java | 8 +-
.../runtime/tasks/StreamIterationTail.java | 6 +-
.../streaming/runtime/tasks/StreamTask.java | 447 ++++++++++---------
.../runtime/tasks/StreamTaskState.java | 108 +++++
.../runtime/tasks/StreamTaskStateList.java | 60 +++
.../runtime/tasks/StreamingRuntimeContext.java | 204 ---------
.../runtime/tasks/TwoInputStreamTask.java | 15 +-
.../streaming/api/AggregationFunctionTest.java | 31 +-
.../flink/streaming/api/DataStreamTest.java | 5 +-
.../api/functions/PrintSinkFunctionTest.java | 2 +-
.../api/graph/StreamGraphGeneratorTest.java | 15 +-
.../api/operators/StreamGroupedFoldTest.java | 28 +-
.../api/operators/StreamGroupedReduceTest.java | 17 +-
.../api/state/FileStateBackendTest.java | 419 +++++++++++++++++
.../api/state/MemoryStateBackendTest.java | 278 ++++++++++++
.../streaming/api/state/StateHandleTest.java | 135 ------
.../api/state/StatefulOperatorTest.java | 377 ----------------
...AlignedProcessingTimeWindowOperatorTest.java | 209 +++++----
...AlignedProcessingTimeWindowOperatorTest.java | 201 +++++----
.../runtime/tasks/StreamTaskTestHarness.java | 13 +-
.../runtime/tasks/StreamTaskTimerITCase.java | 17 +-
.../streaming/timestamp/TimestampITCase.java | 4 +-
.../flink/streaming/util/MockContext.java | 74 ++-
.../util/OneInputStreamOperatorTestHarness.java | 71 +--
.../streaming/util/SourceFunctionUtil.java | 19 +-
.../util/TwoInputStreamOperatorTestHarness.java | 69 +--
.../flink/streaming/api/scala/DataStream.scala | 88 +---
.../flink/streaming/api/scala/KeyedStream.scala | 106 ++++-
.../api/scala/StreamExecutionEnvironment.scala | 39 +-
.../api/scala/function/StatefulFunction.scala | 16 +-
.../streaming/api/scala/DataStreamTest.scala | 77 ++--
.../streaming/api/scala/StateTestPrograms.scala | 23 +-
.../CoStreamCheckpointingITCase.java | 73 +--
.../PartitionedStateCheckpointingITCase.java | 52 +--
.../checkpointing/StateCheckpoinedITCase.java | 21 +-
.../StreamCheckpointNotifierITCase.java | 61 ++-
.../StreamCheckpointingITCase.java | 120 ++---
.../UdfStreamOperatorCheckpointingITCase.java | 50 ++-
.../test/classloading/ClassLoaderITCase.java | 4 +-
.../ProcessFailureStreamingRecoveryITCase.java | 38 +-
149 files changed, 5747 insertions(+), 3790 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index b16fc09..f0913e8 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -62,11 +61,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
private final Fields inputSchema;
/** The original Storm topology. */
protected StormTopology stormTopology;
+
/**
* We have to use this because Operators must output
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
*/
- private TimestampedCollector<OUT> flinkCollector;
+ private transient TimestampedCollector<OUT> flinkCollector;
/**
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
@@ -206,8 +206,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
}
@Override
- public void open(final Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
this.flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;
@@ -217,7 +217,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
this.numberOfAttributes, flinkCollector));
}
- GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
+ GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
if (config != null) {
@@ -229,7 +229,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
}
final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
- super.runtimeContext, this.bolt, this.stormTopology, stormConfig);
+ getRuntimeContext(), this.bolt, this.stormTopology, stormConfig);
this.bolt.prepare(stormConfig, topologyContext, stormCollector);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 914a19d..e78dd5c 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.storm.util.FiniteSpout;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import com.google.common.collect.Sets;
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index d529b6a..5f1f142 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -29,7 +29,7 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import clojure.lang.Atom;
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
index e6fb8e5..906d081 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.flink.storm.api;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
+
+import org.junit.Ignore;
import org.junit.Test;
import backtype.storm.tuple.Fields;
@@ -52,6 +54,7 @@ public class FlinkTopologyBuilderTest {
}
@Test
+ @Ignore
public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
@@ -63,6 +66,7 @@ public class FlinkTopologyBuilderTest {
}
@Test
+ @Ignore
public void testFieldsGroupingOnMultipleBoltOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e33fdb9..c1485c8 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -29,18 +29,18 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.wrappers.BoltWrapper;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.StormTuple;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -139,7 +139,6 @@ public class BoltWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final IRichBolt bolt = mock(IRichBolt.class);
@@ -149,8 +148,8 @@ public class BoltWrapperTest extends AbstractTest {
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
- wrapper.setup(mock(Output.class), taskContext);
- wrapper.open(null);
+ wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+ wrapper.open();
wrapper.processElement(record);
if (numberOfAttributes == -1) {
@@ -169,11 +168,6 @@ public class BoltWrapperTest extends AbstractTest {
final StreamRecord record = mock(StreamRecord.class);
when(record.getValue()).thenReturn(2).thenReturn(3);
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
final Output output = mock(Output.class);
final TestBolt bolt = new TestBolt();
@@ -186,8 +180,8 @@ public class BoltWrapperTest extends AbstractTest {
}
final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw);
- wrapper.setup(output, taskContext);
- wrapper.open(null);
+ wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
+ wrapper.open();
final SplitStreamType splitRecord = new SplitStreamType<Integer>();
if (rawOutType1) {
@@ -214,86 +208,70 @@ public class BoltWrapperTest extends AbstractTest {
@SuppressWarnings("unchecked")
@Test
public void testOpen() throws Exception {
- final StormConfig stormConfig = new StormConfig();
- final Configuration flinkConfig = new Configuration();
-
- final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
- when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
+
+ // utility mocks
final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ // (1) open with no configuration
+ {
+ ExecutionConfig execConfig = mock(ExecutionConfig.class);
+ when(execConfig.getGlobalJobParameters()).thenReturn(null);
+
+ final IRichBolt bolt = mock(IRichBolt.class);
+ BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+ wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+
+ wrapper.open();
+ verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+ }
- final IRichBolt bolt = mock(IRichBolt.class);
+ // (2) open with a storm specific configuration
+ {
+ final StormConfig stormConfig = new StormConfig();
- BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), taskContext);
+ ExecutionConfig execConfig = mock(ExecutionConfig.class);
+ when(execConfig.getGlobalJobParameters()).thenReturn(stormConfig);
+
+ final IRichBolt bolt = mock(IRichBolt.class);
+ BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+ wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+
+ wrapper.open();
+ verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class));
+ }
- // test without configuration
- wrapper.open(null);
- verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+ // (3) open with a flink config
+ {
+ final Configuration cfg = new Configuration();
+ cfg.setString("foo", "bar");
+ cfg.setInteger("the end (the int)", Integer.MAX_VALUE);
- // test with StormConfig
- wrapper.open(null);
- verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
- any(OutputCollector.class));
+ ExecutionConfig execConfig = mock(ExecutionConfig.class);
+ when(execConfig.getGlobalJobParameters()).thenReturn(new UnmodifiableConfiguration(cfg));
- // test with Configuration
- final TestDummyBolt testBolt = new TestDummyBolt();
- wrapper = new BoltWrapper<Object, Object>(testBolt);
- wrapper.setup(mock(Output.class), taskContext);
+ TestDummyBolt testBolt = new TestDummyBolt();
+ BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt);
+ wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
- wrapper.open(null);
- for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
- Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+ wrapper.open();
+ for (Entry<String, String> entry : cfg.toMap().entrySet()) {
+ Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+ }
}
}
@SuppressWarnings("unchecked")
@Test
public void testOpenSink() throws Exception {
- final StormConfig stormConfig = new StormConfig();
- final Configuration flinkConfig = new Configuration();
-
- final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
- when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
final IRichBolt bolt = mock(IRichBolt.class);
-
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), taskContext);
-
- // test without configuration
- wrapper.open(null);
- verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
- isNull(OutputCollector.class));
-
- // test with StormConfig
- wrapper.open(null);
- verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
- isNull(OutputCollector.class));
-
- // test with Configuration
- final TestDummyBolt testBolt = new TestDummyBolt();
- wrapper = new BoltWrapper<Object, Object>(testBolt);
- wrapper.setup(mock(Output.class), taskContext);
-
- wrapper.open(null);
- for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
- Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
- }
+
+ wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+ wrapper.open();
+
+ verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}
@SuppressWarnings("unchecked")
@@ -306,9 +284,8 @@ public class BoltWrapperTest extends AbstractTest {
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- wrapper.setup(mock(Output.class), taskContext);
+
+ wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
wrapper.close();
wrapper.dispose();
@@ -351,5 +328,25 @@ public class BoltWrapperTest extends AbstractTest {
}
}
-
+ public static StreamTask<?, ?> createMockStreamTask() {
+ return createMockStreamTask(new ExecutionConfig());
+ }
+
+ public static StreamTask<?, ?> createMockStreamTask(ExecutionConfig execConfig) {
+ Environment env = mock(Environment.class);
+ when(env.getTaskName()).thenReturn("Mock Task");
+ when(env.getTaskNameWithSubtasks()).thenReturn("Mock Task (1/1)");
+ when(env.getIndexInSubtaskGroup()).thenReturn(0);
+ when(env.getNumberOfSubtasks()).thenReturn(1);
+ when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
+
+ StreamTask<?, ?> mockTask = mock(StreamTask.class);
+ when(mockTask.getName()).thenReturn("Mock Task (1/1)");
+ when(mockTask.getCheckpointLock()).thenReturn(new Object());
+ when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(execConfig);
+
+ return mockTask;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index 227d736..b81b775 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -30,11 +30,8 @@ import org.apache.flink.storm.util.FiniteSpout;
import org.apache.flink.storm.util.FiniteTestSpout;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.SpoutWrapper;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -72,7 +69,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = mock(IRichSpout.class);
@@ -112,7 +108,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = mock(IRichSpout.class);
@@ -136,7 +131,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final FiniteTestSpout spout = new FiniteTestSpout(numberOfCalls);
@@ -158,7 +152,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
@@ -176,7 +169,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
@@ -192,7 +184,6 @@ public class SpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index c3b0300..20e480d 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -39,9 +39,7 @@ import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -181,9 +179,9 @@ public class WrapperSetupHelperTest extends AbstractTest {
builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
- .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
- .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
int counter = 0;
@@ -207,9 +205,9 @@ public class WrapperSetupHelperTest extends AbstractTest {
flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
- .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
- .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
flinkBuilder.createTopology();
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 5a019aa..fd9de67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -38,10 +38,12 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
private transient RuntimeContext runtimeContext;
+ @Override
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
-
+
+ @Override
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
@@ -50,6 +52,7 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
}
}
+ @Override
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
throw new IllegalStateException("The runtime context has not been initialized.");
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
index 73e738e..8239921 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.types.Value;
/**
- *
+ *
*/
public interface IterationRuntimeContext extends RuntimeContext {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index 0685f63..0cbde4a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -36,7 +36,7 @@ public interface RichFunction extends Function {
* The configuration contains all parameters that were configured on the function in the program
* composition.
*
- * <pre><blockquote>
+ * <pre>{@code
* public class MyMapper extends FilterFunction<String> {
*
* private String searchString;
@@ -49,7 +49,7 @@ public interface RichFunction extends Function {
* return value.equals(searchString);
* }
* }
- * </blockquote></pre>
+ * }</pre>
* <p>
* By default, this method does nothing.
*
@@ -64,7 +64,7 @@ public interface RichFunction extends Function {
void open(Configuration parameters) throws Exception;
/**
- * Teardown method for the user code. It is called after the last call to the main working methods
+ * Tear-down method for the user code. It is called after the last call to the main working methods
* (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this method will
* be invoked after each iteration superstep.
* <p>
@@ -76,16 +76,32 @@ public interface RichFunction extends Function {
*/
void close() throws Exception;
+ // ------------------------------------------------------------------------
+ // Runtime context
+ // ------------------------------------------------------------------------
/**
- * Gets the context that contains information about the UDF's runtime.
+ * Gets the context that contains information about the UDF's runtime, such as the
+ * parallelism of the function, the subtask index of the function, or the name of
+ * the of the task that executes the function.
*
- * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s
- * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
+ * <p>The RuntimeContext also gives access to the
+ * {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
+ * {@link org.apache.flink.api.common.cache.DistributedCache}.
*
* @return The UDF's runtime context.
*/
RuntimeContext getRuntimeContext();
+
+ /**
+ * Gets a specialized version of the {@link RuntimeContext}, which has additional information
+ * about the iteration in which the function is executed. This IterationRuntimeContext is only
+ * available if the function is part of an iteration. Otherwise, this method throws an exception.
+ *
+ * @return The IterationRuntimeContext.
+ * @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
+ */
+ IterationRuntimeContext getIterationRuntimeContext();
/**
* Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 289f063..cadef36 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.common.functions;
-import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -82,11 +81,7 @@ public interface RuntimeContext {
// --------------------------------------------------------------------------------------------
/**
- * Add this accumulator. Throws an exception if the counter is already
- * existing.
- *
- * This is only needed to support generic accumulators (e.g. for
- * Set<String>). Didn't find a way to get this work with getAccumulator.
+ * Add this accumulator. Throws an exception if the accumulator already exists.
*/
<V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);
@@ -169,65 +164,101 @@ public interface RuntimeContext {
// --------------------------------------------------------------------------------------------
/**
- * Returns the {@link OperatorState} with the given name of the underlying
- * operator instance, which can be used to store and update user state in a
- * fault tolerant fashion. The state will be initialized by the provided
- * default value, and the {@link StateCheckpointer} will be used to draw the
- * state snapshots.
+ * Gets the key/value state, which is only accessible if the function is executed on
+ * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
+ * return the value bound to the key of the element currently processed by the function.
+ *
+ * <p>Because the scope of each value is the key of the currently processed element,
+ * and the elements are distributed by the Flink runtime, the system can transparently
+ * scale out and redistribute the state and KeyedStream.
+ *
+ * <p>The following code example shows how to implement a continuous counter that counts
+ * how many times elements of a certain key occur, and emits an updated count for that
+ * element on each occurrence.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
+ *
+ * private State<Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getKeyValueState(Long.class, 0L);
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * long count = state.value();
+ * state.update(value + 1);
+ * return new Tuple2<>(value, count);
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * <p>This method attempts to deduce the type information from the given type class. If the
+ * full type cannot be determined from the class (for example because of generic parameters),
+ * the TypeInformation object must be manually passed via
+ * {@link #getKeyValueState(TypeInformation, Object)}.
*
- * <p>
- * When storing a {@link Serializable} state the user can omit the
- * {@link StateCheckpointer} in which case the full state will be written as
- * the snapshot.
- * </p>
- *
- * @param name
- * Identifier for the state allowing that more operator states
- * can be used by the same operator.
- * @param defaultState
- * Default value for the operator state. This will be returned
- * the first time {@link OperatorState#value()} (for every
- * state partition) is called before
- * {@link OperatorState#update(Object)}.
- * @param partitioned
- * Sets whether partitioning should be applied for the given
- * state. If true a partitioner key must be used.
- * @param checkpointer
- * The {@link StateCheckpointer} that will be used to draw
- * snapshots from the user state.
- * @return The {@link OperatorState} for the underlying operator.
- *
- * @throws IOException Thrown if the system cannot access the state.
- */
- <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
- boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException;
-
- /**
- * Returns the {@link OperatorState} with the given name of the underlying
- * operator instance, which can be used to store and update user state in a
- * fault tolerant fashion. The state will be initialized by the provided
- * default value.
+ * @param stateType The class of the type that is stored in the state. Used to generate
+ * serializers for managed memory and checkpointing.
+ * @param defaultState The default state value, returned when the state is accessed and
+ * no value has yet been set for the key. May be null.
+ * @param <S> The type of the state.
+ *
+ * @return The key/value state access.
+ *
+ * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
+ * function (function is not part os a KeyedStream).
+ */
+ <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState);
+
+ /**
+ * Gets the key/value state, which is only accessible if the function is executed on
+ * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
+ * return the value bound to the key of the element currently processed by the function.
*
- * <p>
- * When storing a non-{@link Serializable} state the user needs to specify a
- * {@link StateCheckpointer} for drawing snapshots.
- * </p>
- *
- * @param name
- * Identifier for the state allowing that more operator states
- * can be used by the same operator.
- * @param defaultState
- * Default value for the operator state. This will be returned
- * the first time {@link OperatorState#value()} (for every
- * state partition) is called before
- * {@link OperatorState#update(Object)}.
- * @param partitioned
- * Sets whether partitioning should be applied for the given
- * state. If true a partitioner key must be used.
- * @return The {@link OperatorState} for the underlying operator.
- *
- * @throws IOException Thrown if the system cannot access the state.
- */
- <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
- boolean partitioned) throws IOException;
+ * <p>Because the scope of each value is the key of the currently processed element,
+ * and the elements are distributed by the Flink runtime, the system can transparently
+ * scale out and redistribute the state and KeyedStream.
+ *
+ * <p>The following code example shows how to implement a continuous counter that counts
+ * how many times elements of a certain key occur, and emits an updated count for that
+ * element on each occurrence.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
+ *
+ * private State<Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getKeyValueState(Long.class, 0L);
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * long count = state.value();
+ * state.update(value + 1);
+ * return new Tuple2<>(value, count);
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * @param stateType The type information for the type that is stored in the state.
+ * Used to create serializers for managed memory and checkpoints.
+ * @param defaultState The default state value, returned when the state is accessed and
+ * no value has yet been set for the key. May be null.
+ * @param <S> The type of the state.
+ *
+ * @return The key/value state access.
+ *
+ * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
+ * function (function is not part os a KeyedStream).
+ */
+ <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 71be1e1..90d23cd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.common.functions.util;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
@@ -35,7 +34,7 @@ import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
/**
@@ -164,16 +163,16 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
}
return (Accumulator<V, A>) accumulator;
}
-
+
@Override
- public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
- S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
- throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+ public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ throw new UnsupportedOperationException(
+ "This state is only accessible by functions executed on a KeyedStream");
}
@Override
- public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
- boolean partitioned) throws IOException{
- throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+ public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ throw new UnsupportedOperationException(
+ "This state is only accessible by functions executed on a KeyedStream");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
index a3a369b..12d9fda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
@@ -22,6 +22,11 @@ import java.util.List;
import org.apache.flink.util.Collector;
+/**
+ * A {@link Collector} that puts the collected elements into a given list.
+ *
+ * @param <T> The type of the collected elements.
+ */
public class ListCollector<T> implements Collector<T> {
private final List<T> list;
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3036023..136b6f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -20,24 +20,17 @@ package org.apache.flink.api.common.state;
import java.io.IOException;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-
/**
- * Base interface for all streaming operator states. It can represent both
- * partitioned (when state partitioning is defined in the program) or
- * non-partitioned user states.
+ * This state interface abstracts persistent key/value state in streaming programs.
+ * The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
*
- * State can be accessed and manipulated using the {@link #value()} and
- * {@link #update(T)} methods. These calls are only safe in the
- * transformation call the operator represents, for instance inside
- * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
- * {@link AbstractRichFunction#open(Configuration)} or
- * {@link AbstractRichFunction#close()} methods.
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
*
- * @param <T>
- * Type of the operator state
+ * @param <T> Type of the value in the operator state
*/
public interface OperatorState<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
deleted file mode 100644
index f373846..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import java.io.Serializable;
-
-/**
- * Basic interface for creating {@link OperatorState} snapshots in stateful
- * streaming programs.
- *
- * The user needs to implement the {@link #snapshotState(S, long, long)} and
- * {@link #restoreState(C)} methods that will be called to create and restore
- * state snapshots of the given states.
- *
- * <p>
- * Note that the {@link OperatorState} is <i>synchronously</i> checkpointed.
- * While the state is written, the state cannot be accessed or modified so the
- * function needs not return a copy of its state, but may return a reference to
- * its state.
- * </p>
- *
- * @param <S>
- * Type of the operator state.
- * @param <C>
- * Type of the snapshot that will be persisted.
- */
-public interface StateCheckpointer<S, C extends Serializable> {
-
- /**
- * Takes a snapshot of a given operator state. The snapshot returned will be
- * persisted in the state backend for this job and restored upon failure.
- * This method is called for all state partitions in case of partitioned
- * state when creating a checkpoint.
- *
- * @param state
- * The state for which the snapshot needs to be taken
- * @param checkpointId
- * The ID of the checkpoint.
- * @param checkpointTimestamp
- * The timestamp of the checkpoint, as derived by
- * System.currentTimeMillis() on the JobManager.
- *
- * @return A snapshot of the operator state.
- */
- C snapshotState(S state, long checkpointId, long checkpointTimestamp);
-
- /**
- * Restores the operator states from a given snapshot. The restores state
- * will be loaded back to the function. In case of partitioned state, each
- * partition is restored independently.
- *
- * @param stateSnapshot
- * The state snapshot that needs to be restored.
- * @return The state corresponding to the snapshot.
- */
- S restoreState(C stateSnapshot);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 36369ab..b1ffdd8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -415,11 +415,6 @@ public final class ConfigConstants {
*/
public static final String STATE_BACKEND = "state.backend";
- /**
- * Directory for saving streaming checkpoints
- */
- public static final String STATE_BACKEND_FS_DIR = "state.backend.fs.checkpointdir";
-
// ----------------------------- Miscellaneous ----------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
index 7de1d71..b4dffb1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
@@ -121,9 +121,10 @@ public class InputViewDataInputStreamWrapper implements DataInputView, Closeable
public double readDouble() throws IOException {
return in.readDouble();
}
-
- @SuppressWarnings("deprecation")
+
@Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
public String readLine() throws IOException {
return in.readLine();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index de04dc4..8ce3e85 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -246,7 +246,7 @@ public final class InstantiationUtil {
}
}
- public static Object readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
+ public static <T> T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
byte[] bytes = config.getBytes(key, null);
if (bytes == null) {
return null;
@@ -284,13 +284,14 @@ public final class InstantiationUtil {
return serializer.deserialize(record, inputViewWrapper);
}
- public static Object deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
+ @SuppressWarnings("unchecked")
+ public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
ObjectInputStream oois = null;
final ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(cl);
oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl);
- return oois.readObject();
+ return (T) oois.readObject();
} finally {
Thread.currentThread().setContextClassLoader(old);
if (oois != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
index 5731fc1..504e458 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -55,6 +55,14 @@ public class SerializedValue<T> implements java.io.Serializable {
return serializedData == null ? null : (T) InstantiationUtil.deserializeObject(serializedData, loader);
}
+ /**
+ * Gets the size of the serialized state.
+ * @return The size of the serialized state.
+ */
+ public int getSizeOfSerializedState() {
+ return serializedData.length;
+ }
+
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 5b7afaa..4dbf04c 100644
--- a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.core.testutils;
import static org.junit.Assert.fail;
@@ -37,8 +36,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
/**
- * This class contains auxiliary methods for unit tests in the Nephele common module.
- *
+ * This class contains reusable utility methods for unit tests.
*/
public class CommonTestUtils {
@@ -127,9 +125,7 @@ public class CommonTestUtils {
T copy = null;
try {
copy = clazz.newInstance();
- } catch (InstantiationException e) {
- fail(e.getMessage());
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
fail(e.getMessage());
}
@@ -157,19 +153,14 @@ public class CommonTestUtils {
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- ObjectInputStream ois = new ObjectInputStream(bais);
- T copy;
- try {
- copy = (T) ois.readObject();
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ @SuppressWarnings("unchecked")
+ T copy = (T) ois.readObject();
+ return copy;
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
-
- ois.close();
- bais.close();
-
- return copy;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 4dd8173..1b04e35 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -79,16 +79,17 @@ webclient.port: 8080
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
-# Supported backends: jobmanager, filesystem
-
-state.backend: jobmanager
+# Supported backends: jobmanager, filesystem, <class-name-of-factory>
+#
+#state.backend: filesystem
-# Directory for storing checkpoints in a flink supported filesystem
-# Note: State backend must be accessible from the JobManager, use file://
-# only for local setups.
+# Directory for storing checkpoints in a Flink-supported filesystem
+# Note: State backend must be accessible from the JobManager and all TaskManagers.
+# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
+# (or any local file system under Windows), or "S3://" for S3 file system.
#
-# state.backend.fs.checkpointdir: hdfs://checkpoints
+# state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints
#==============================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 091c739..c45990b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -67,35 +67,4 @@ public class FileStateHandle extends ByteStreamStateHandle {
public void discardState() throws Exception {
FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
}
-
- /**
- * Creates a {@link StateHandleProvider} for creating
- * {@link FileStateHandle}s for a given checkpoint directory.
- *
- */
- public static StateHandleProvider<Serializable> createProvider(String checkpointDir) {
- return new FileStateHandleProvider(checkpointDir);
- }
-
- /**
- * {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
- * given checkpoint directory.
- *
- */
- private static class FileStateHandleProvider implements StateHandleProvider<Serializable> {
-
- private static final long serialVersionUID = 3496670017955260518L;
- private String path;
-
- public FileStateHandleProvider(String path) {
- this.path = path;
- }
-
- @Override
- public FileStateHandle createStateHandle(Serializable state) {
- return new FileStateHandle(state, path);
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index 1b524d8..f2be70a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -40,18 +40,5 @@ public class LocalStateHandle<T extends Serializable> implements StateHandle<T>
}
@Override
- public void discardState() throws Exception {
- }
-
- public static class LocalStateHandleProvider<R extends Serializable> implements
- StateHandleProvider<R> {
-
- private static final long serialVersionUID = 4665419208932921425L;
-
- @Override
- public LocalStateHandle<R> createStateHandle(R state) {
- return new LocalStateHandle<R>(state);
- }
-
- }
+ public void discardState() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
deleted file mode 100644
index bac490b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Stateful streaming operators use a StateHandleProvider to create new
- * {@link StateHandle}s to store each checkpoint in a persistent storage layer.
- */
-public interface StateHandleProvider<T> extends Serializable {
-
- /**
- * Creates a new {@link StateHandle} instance that will be used to store the
- * state checkpoint. This method is called for each state checkpoint saved.
- *
- * @param state
- * State to be stored in the handle.
- *
- */
- public StateHandle<T> createStateHandle(T state);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 269222f..c8d50c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -885,7 +885,6 @@ public class Task implements Runnable {
// build a local closure
final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
- final Logger logger = LOG;
final String taskName = taskNameWithSubtask;
Runnable runnable = new Runnable() {
@@ -919,7 +918,6 @@ public class Task implements Runnable {
// build a local closure
final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
- final Logger logger = LOG;
final String taskName = taskNameWithSubtask;
Runnable runnable = new Runnable() {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml
index fe1abb3..021d822 100644
--- a/flink-staging/flink-fs-tests/pom.xml
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -42,24 +42,42 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@@ -67,6 +85,7 @@ under the License.
<type>test-jar</type>
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
new file mode 100644
index 0000000..8b7fb1c
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.StreamStateHandle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileStateBackendTest {
+
+ private static File TEMP_DIR;
+
+ private static String HDFS_ROOT_URI;
+
+ private static MiniDFSCluster HDFS_CLUSTER;
+
+ private static FileSystem FS;
+
+ // ------------------------------------------------------------------------
+ // startup / shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() {
+ try {
+ TEMP_DIR = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+
+ Configuration hdConf = new Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ HDFS_CLUSTER = builder.build();
+
+ HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+ + HDFS_CLUSTER.getNameNodePort() + "/";
+
+ FS = FileSystem.get(new URI(HDFS_ROOT_URI));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Could not create HDFS mini cluster " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ try {
+ HDFS_CLUSTER.shutdown();
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+ catch (Exception ignored) {}
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSetupAndSerialization() {
+ try {
+ URI baseUri = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
+
+ FsStateBackend originalBackend = new FsStateBackend(baseUri);
+
+ assertFalse(originalBackend.isInitialized());
+ assertEquals(baseUri, originalBackend.getBasePath().toUri());
+ assertNull(originalBackend.getCheckpointDirectory());
+
+ // serialize / copy the backend
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+ assertFalse(backend.isInitialized());
+ assertEquals(baseUri, backend.getBasePath().toUri());
+ assertNull(backend.getCheckpointDirectory());
+
+ // no file operations should be possible right now
+ try {
+ backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // supreme!
+ }
+
+ backend.initializeForJob(new JobID());
+ assertNotNull(backend.getCheckpointDirectory());
+
+ Path checkpointDir = backend.getCheckpointDirectory();
+ assertTrue(FS.exists(checkpointDir));
+ assertTrue(isDirectoryEmpty(checkpointDir));
+
+ backend.disposeAllStateForCurrentJob();
+ assertNull(backend.getCheckpointDirectory());
+
+ assertTrue(isDirectoryEmpty(baseUri));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerializableState() {
+
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+ backend.initializeForJob(new JobID());
+
+ Path checkpointDir = backend.getCheckpointDirectory();
+
+ String state1 = "dummy state";
+ String state2 = "row row row your boat";
+ Integer state3 = 42;
+
+ StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+ StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+ StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+ handle1.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+ handle2.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+ handle3.discardState();
+
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStateOutputStream() {
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+ backend.initializeForJob(new JobID());
+
+ Path checkpointDir = backend.getCheckpointDirectory();
+
+ byte[] state1 = new byte[1274673];
+ byte[] state2 = new byte[1];
+ byte[] state3 = new byte[0];
+ byte[] state4 = new byte[177];
+
+ Random rnd = new Random();
+ rnd.nextBytes(state1);
+ rnd.nextBytes(state2);
+ rnd.nextBytes(state3);
+ rnd.nextBytes(state4);
+
+ long checkpointId = 97231523452L;
+
+ FsStateBackend.FsCheckpointStateOutputStream stream1 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream2 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream3 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+
+ stream1.write(state1);
+ stream2.write(state2);
+ stream3.write(state3);
+
+ FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+ FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+ FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+
+ // use with try-with-resources
+ StreamStateHandle handle4;
+ try (StateBackend.CheckpointStateOutputStream stream4 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+ stream4.write(state4);
+ handle4 = stream4.closeAndGetHandle();
+ }
+
+ // close before accessing handle
+ StateBackend.CheckpointStateOutputStream stream5 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ stream5.write(state4);
+ stream5.close();
+ try {
+ stream5.closeAndGetHandle();
+ fail();
+ } catch (IOException e) {
+ // uh-huh
+ }
+
+ validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+ handle1.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureFileDeleted(handle1.getFilePath());
+
+ validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+ handle2.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureFileDeleted(handle2.getFilePath());
+
+ validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+ handle3.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureFileDeleted(handle3.getFilePath());
+
+ validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+ handle4.discardState();
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static void ensureFileDeleted(Path path) {
+ try {
+ assertFalse(FS.exists(path));
+ }
+ catch (IOException ignored) {}
+ }
+
+ private static boolean isDirectoryEmpty(URI directory) {
+ return isDirectoryEmpty(new Path(directory));
+ }
+
+ private static boolean isDirectoryEmpty(Path directory) {
+ try {
+ FileStatus[] nested = FS.listStatus(directory);
+ return nested == null || nested.length == 0;
+ }
+ catch (IOException e) {
+ return true;
+ }
+ }
+
+ private static String randomHdfsFileUri() {
+ return HDFS_ROOT_URI + UUID.randomUUID().toString();
+ }
+
+ private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+ byte[] holder = new byte[data.length];
+
+ int pos = 0;
+ int read;
+ while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
+ pos += read;
+ }
+
+ assertEquals("not enough data", holder.length, pos);
+ assertEquals("too much data", -1, is.read());
+ assertArrayEquals("wrong data", data, holder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
deleted file mode 100644
index 59ee5a9..0000000
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.util.SerializedValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileStateHandleTest {
-
- private String hdfsURI;
- private MiniDFSCluster hdfsCluster;
- private org.apache.hadoop.fs.Path hdPath;
- private org.apache.hadoop.fs.FileSystem hdfs;
-
- @Before
- public void createHDFS() {
- try {
- Configuration hdConf = new Configuration();
-
- File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
- hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
- hdfsCluster = builder.build();
-
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
- + hdfsCluster.getNameNodePort() + "/";
-
- hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
- hdfs = hdPath.getFileSystem(hdConf);
- hdfs.mkdirs(hdPath);
-
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail("Test failed " + e.getMessage());
- }
- }
-
- @After
- public void destroyHDFS() {
- try {
- hdfs.delete(hdPath, true);
- hdfsCluster.shutdown();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- @Test
- public void testFileStateHandle() throws Exception {
-
- Serializable state = "state";
-
- // Create a state handle provider for the hdfs directory
- StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
- + hdPath);
-
- FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
-
- try {
- handleProvider.createStateHandle(null);
- fail();
- } catch (RuntimeException e) {
- // good
- }
-
- assertTrue(handle.stateFetched());
- assertFalse(handle.isWritten());
-
- // Serialize the handle so it writes the value to hdfs
- SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
- handle);
-
- assertTrue(handle.isWritten());
-
- // Deserialize the handle and verify that the state is not fetched yet
- FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
- .deserializeValue(Thread.currentThread().getContextClassLoader());
- assertFalse(deserializedHandle.stateFetched());
-
- // Fetch the and compare with original
- assertEquals(state, deserializedHandle.getState(this.getClass().getClassLoader()));
-
- // Test whether discard removes the checkpoint file properly
- assertTrue(hdfs.listFiles(hdPath, true).hasNext());
- deserializedHandle.discardState();
- assertFalse(hdfs.listFiles(hdPath, true).hasNext());
-
- }
-
-}