You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:01 UTC
[17/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
deleted file mode 100644
index 44c9017..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.base.Joiner;
-import java.io.File;
-import java.net.URI;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * Reads from a bounded source in batch execution.
- */
-public class ReadSourceITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public ReadSourceITCase(){
- }
-
- private static final String[] EXPECTED_RESULT = new String[] {
- "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
-
- // need to create the dir, otherwise Beam sinks don't
- // work for these tests
-
- if (!new File(new URI(resultPath)).mkdirs()) {
- throw new RuntimeException("Could not create output dir.");
- }
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> result = p
- .apply(CountingInput.upTo(10))
- .apply(ParDo.of(new DoFn<Long, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toString());
- }
- }));
-
- result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
-
- p.run();
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
deleted file mode 100644
index 79b7882..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Reads from a bounded source in streaming.
- */
-public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- public ReadSourceStreamingITCase(){
- }
-
- private static final String[] EXPECTED_RESULT = new String[] {
- "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) {
-
- Pipeline p = FlinkTestPipeline.createForStreaming();
-
- p
- .apply(CountingInput.upTo(10))
- .apply(ParDo.of(new DoFn<Long, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toString());
- }
- }))
- .apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
deleted file mode 100644
index 38b790e..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertNotNull;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * Tests the translation of custom Write sinks.
- */
-public class WriteSinkITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public WriteSinkITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "Joe red 3", "Mary blue 4", "Max yellow 23"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result-" + System.nanoTime());
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- @Override
- public void stopCluster() throws Exception {
- try {
- super.stopCluster();
- } catch (final IOException ioe) {
- if (ioe.getMessage().startsWith("Unable to delete file")) {
- // that's ok for the test itself, just the OS playing with us on cleanup phase
- }
- }
- }
-
- private static void runProgram(String resultPath) {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of())
- .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
- p.run();
- }
-
- /**
- * Simple custom sink which writes to a file.
- */
- private static class MyCustomSink extends Sink<String> {
-
- private final String resultPath;
-
- public MyCustomSink(String resultPath) {
- this.resultPath = resultPath;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- assertNotNull(options);
- }
-
- @Override
- public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
- return new MyWriteOperation();
- }
-
- private class MyWriteOperation extends WriteOperation<String, String> {
-
- @Override
- public Coder<String> getWriterResultCoder() {
- return StringUtf8Coder.of();
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {
-
- }
-
- @Override
- public void setWindowedWrites(boolean windowedWrites) {
-
- }
-
- @Override
- public void finalize(Iterable<String> writerResults, PipelineOptions options)
- throws Exception {
-
- }
-
- @Override
- public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
- return new MyWriter();
- }
-
- @Override
- public Sink<String> getSink() {
- return MyCustomSink.this;
- }
-
- /**
- * Simple Writer which writes to a file.
- */
- private class MyWriter extends Writer<String, String> {
-
- private PrintWriter internalWriter;
-
- @Override
- public final void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
- throw new UnsupportedOperationException("Windowed writes not supported.");
- }
-
- @Override
- public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
- Path path = new Path(resultPath + "/" + uId);
- FileSystem.get(new URI("file:///")).create(path, false);
- internalWriter = new PrintWriter(new File(path.toUri()));
- }
-
- @Override
- public void cleanup() throws Exception {
-
- }
-
- @Override
- public void write(String value) throws Exception {
- internalWriter.println(value);
- }
-
- @Override
- public String close() throws Exception {
- internalWriter.close();
- return resultPath;
- }
-
- @Override
- public WriteOperation<String, String> getWriteOperation() {
- return MyWriteOperation.this;
- }
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
deleted file mode 100644
index 4c826d1..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import java.util.HashMap;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StatefulDoFnRunner;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DoFnOperator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnOperatorTest {
-
- // views and windows for testing side inputs
- private static final long WINDOW_MSECS_1 = 100;
- private static final long WINDOW_MSECS_2 = 500;
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
- private PCollectionView<Iterable<String>> view1 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy1);
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
- private PCollectionView<Iterable<String>> view2 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy2);
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSingleOutput() throws Exception {
-
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
-
- TupleTag<String> outputTag = new TupleTag<>("main-output");
-
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- new IdentityDoFn<String>(),
- windowedValueCoder,
- outputTag,
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory(),
- WindowingStrategy.globalDefault(),
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
- null);
-
- OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness =
- new OneInputStreamOperatorTestHarness<>(doFnOperator);
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
-
- assertThat(
- this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(WindowedValue.valueInGlobalWindow("Hello")));
-
- testHarness.close();
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testMultiOutputOutput() throws Exception {
-
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
-
- TupleTag<String> mainOutput = new TupleTag<>("main-output");
- TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
- TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
- ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
- .put(mainOutput, 1)
- .put(additionalOutput1, 2)
- .put(additionalOutput2, 3)
- .build();
-
- DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
- new MultiOutputDoFn(additionalOutput1, additionalOutput2),
- windowedValueCoder,
- mainOutput,
- ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
- new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
- WindowingStrategy.globalDefault(),
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
- null);
-
- OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness =
- new OneInputStreamOperatorTestHarness<>(doFnOperator);
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("one")));
- testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("two")));
- testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello")));
-
- assertThat(
- this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
- contains(
- new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")),
- new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")),
- new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
- new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
- new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
-
- testHarness.close();
- }
-
- @Test
- public void testLateDroppingForStatefulFn() throws Exception {
-
- WindowingStrategy<Object, IntervalWindow> windowingStrategy =
- WindowingStrategy.of(FixedWindows.of(new Duration(10)));
-
- DoFn<Integer, String> fn = new DoFn<Integer, String>() {
-
- @StateId("state")
- private final StateSpec<Object, ValueState<String>> stateSpec =
- StateSpecs.value(StringUtf8Coder.of());
-
- @ProcessElement
- public void processElement(ProcessContext context) {
- context.output(context.element().toString());
- }
- };
-
- WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
- WindowedValue.getFullCoder(
- VarIntCoder.of(),
- windowingStrategy.getWindowFn().windowCoder());
-
- TupleTag<String> outputTag = new TupleTag<>("main-output");
-
- DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
- fn,
- windowedValueCoder,
- outputTag,
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
- windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
- VarIntCoder.of() /* key coder */);
-
- OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(
- doFnOperator,
- new KeySelector<WindowedValue<Integer>, Integer>() {
- @Override
- public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception {
- return integerWindowedValue.getValue();
- }
- },
- new CoderTypeInformation<>(VarIntCoder.of()));
-
- testHarness.open();
-
- testHarness.processWatermark(0);
-
- IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
-
- // this should not be late
- testHarness.processElement(
- new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
-
- assertThat(
- this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING)));
-
- testHarness.getOutput().clear();
-
- testHarness.processWatermark(9);
-
- // this should still not be considered late
- testHarness.processElement(
- new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
-
- assertThat(
- this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING)));
-
- testHarness.getOutput().clear();
-
- testHarness.processWatermark(10);
-
- // this should now be considered late
- testHarness.processElement(
- new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
-
- assertThat(
- this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- emptyIterable());
-
- testHarness.close();
- }
-
- @Test
- public void testStateGCForStatefulFn() throws Exception {
-
- WindowingStrategy<Object, IntervalWindow> windowingStrategy =
- WindowingStrategy.of(FixedWindows.of(new Duration(10))).withAllowedLateness(Duration.ZERO);
-
- final String timerId = "boo";
- final String stateId = "dazzle";
-
- final int offset = 5000;
- final int timerOutput = 4093;
-
- DoFn<KV<String, Integer>, KV<String, Integer>> fn =
- new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-
- @TimerId(timerId)
- private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @StateId(stateId)
- private final StateSpec<Object, ValueState<String>> stateSpec =
- StateSpecs.value(StringUtf8Coder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext context,
- @TimerId(timerId) Timer timer,
- @StateId(stateId) ValueState<String> state,
- BoundedWindow window) {
- timer.set(window.maxTimestamp());
- state.write(context.element().getKey());
- context.output(
- KV.of(context.element().getKey(), context.element().getValue() + offset));
- }
-
- @OnTimer(timerId)
- public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> state) {
- context.output(KV.of(state.read(), timerOutput));
- }
- };
-
- WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder =
- WindowedValue.getFullCoder(
- KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
- windowingStrategy.getWindowFn().windowCoder());
-
- TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
-
- DoFnOperator<
- KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
- new DoFnOperator<>(
- fn,
- windowedValueCoder,
- outputTag,
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(),
- windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
- StringUtf8Coder.of() /* key coder */);
-
- KeyedOneInputStreamOperatorTestHarness<
- String,
- WindowedValue<KV<String, Integer>>,
- WindowedValue<KV<String, Integer>>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(
- doFnOperator,
- new KeySelector<WindowedValue<KV<String, Integer>>, String>() {
- @Override
- public String getKey(
- WindowedValue<KV<String, Integer>> kvWindowedValue) throws Exception {
- return kvWindowedValue.getValue().getKey();
- }
- },
- new CoderTypeInformation<>(StringUtf8Coder.of()));
-
- testHarness.open();
-
- testHarness.processWatermark(0);
-
- assertEquals(0, testHarness.numKeyedStateEntries());
-
- IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
-
- testHarness.processElement(
- new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 5), new Instant(1), window1, PaneInfo.NO_FIRING)));
-
- testHarness.processElement(
- new StreamRecord<>(
- WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING)));
-
- assertThat(
- this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(
- WindowedValue.of(
- KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING),
- WindowedValue.of(
- KV.of("key2", 7 + offset), new Instant(3), window1, PaneInfo.NO_FIRING)));
-
- assertEquals(2, testHarness.numKeyedStateEntries());
-
- testHarness.getOutput().clear();
-
- // this should trigger both the window.maxTimestamp() timer and the GC timer
- // this tests that the GC timer fires after the user timer
- testHarness.processWatermark(
- window1.maxTimestamp()
- .plus(windowingStrategy.getAllowedLateness())
- .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
- .getMillis());
-
- assertThat(
- this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(
- WindowedValue.of(
- KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING),
- WindowedValue.of(
- KV.of("key2", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING)));
-
- // ensure the state was garbage collected
- assertEquals(0, testHarness.numKeyedStateEntries());
-
- testHarness.close();
- }
-
- public void testSideInputs(boolean keyed) throws Exception {
-
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
-
- TupleTag<String> outputTag = new TupleTag<>("main-output");
-
- ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
- ImmutableMap.<Integer, PCollectionView<?>>builder()
- .put(1, view1)
- .put(2, view2)
- .build();
-
- Coder<String> keyCoder = null;
- if (keyed) {
- keyCoder = StringUtf8Coder.of();
- }
-
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- new IdentityDoFn<String>(),
- windowedValueCoder,
- outputTag,
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
- WindowingStrategy.globalDefault(),
- sideInputMapping, /* side-input mapping */
- ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
- keyCoder);
-
- TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness =
- new TwoInputStreamOperatorTestHarness<>(doFnOperator);
-
- if (keyed) {
- // we use a dummy key for the second input since it is considered to be broadcast
- testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
- doFnOperator,
- new StringKeySelector(),
- new DummyKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO);
- }
-
- testHarness.open();
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(500));
-
- // test the keep of sideInputs events
- testHarness.processElement2(
- new StreamRecord<>(
- new RawUnionValue(
- 1,
- valuesInWindow(ImmutableList.of("hello", "ciao"), new Instant(0), firstWindow))));
- testHarness.processElement2(
- new StreamRecord<>(
- new RawUnionValue(
- 2,
- valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(0), secondWindow))));
-
- // push in a regular elements
- WindowedValue<String> helloElement = valueInWindow("Hello", new Instant(0), firstWindow);
- WindowedValue<String> worldElement = valueInWindow("World", new Instant(1000), firstWindow);
- testHarness.processElement1(new StreamRecord<>(helloElement));
- testHarness.processElement1(new StreamRecord<>(worldElement));
-
- // test the keep of pushed-back events
- testHarness.processElement2(
- new StreamRecord<>(
- new RawUnionValue(
- 1,
- valuesInWindow(ImmutableList.of("hello", "ciao"),
- new Instant(1000), firstWindow))));
- testHarness.processElement2(
- new StreamRecord<>(
- new RawUnionValue(
- 2,
- valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(1000), secondWindow))));
-
- assertThat(
- this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(helloElement, worldElement));
-
- testHarness.close();
-
- }
-
- /**
- * {@link TwoInputStreamOperatorTestHarness} support OperatorStateBackend,
- * but don't support KeyedStateBackend. So we just test sideInput of normal ParDo.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testNormalParDoSideInputs() throws Exception {
- testSideInputs(false);
- }
-
- @Test
- public void testKeyedSideInputs() throws Exception {
- testSideInputs(true);
- }
-
- private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
- Iterable<Object> input) {
-
- return FluentIterable.from(input).filter(new Predicate<Object>() {
- @Override
- public boolean apply(@Nullable Object o) {
- return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue;
- }
- }).transform(new Function<Object, WindowedValue<T>>() {
- @Nullable
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public WindowedValue<T> apply(@Nullable Object o) {
- if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) {
- return (WindowedValue) ((StreamRecord) o).getValue();
- }
- throw new RuntimeException("unreachable");
- }
- });
- }
-
- private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> input) {
- return FluentIterable.from(input).filter(new Predicate<Object>() {
- @Override
- public boolean apply(@Nullable Object o) {
- return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue;
- }
- }).transform(new Function<Object, RawUnionValue>() {
- @Nullable
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public RawUnionValue apply(@Nullable Object o) {
- if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) {
- return (RawUnionValue) ((StreamRecord) o).getValue();
- }
- throw new RuntimeException("unreachable");
- }
- });
- }
-
- private static class MultiOutputDoFn extends DoFn<String, String> {
- private TupleTag<String> additionalOutput1;
- private TupleTag<String> additionalOutput2;
-
- public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) {
- this.additionalOutput1 = additionalOutput1;
- this.additionalOutput2 = additionalOutput2;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- if (c.element().equals("one")) {
- c.output(additionalOutput1, "extra: one");
- } else if (c.element().equals("two")) {
- c.output(additionalOutput2, "extra: two");
- } else {
- c.output("got: " + c.element());
- c.output(additionalOutput1, "got: " + c.element());
- c.output(additionalOutput2, "got: " + c.element());
- }
- }
- }
-
- private static class IdentityDoFn<T> extends DoFn<T, T> {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private WindowedValue<Iterable<?>> valuesInWindow(
- Iterable<?> values, Instant timestamp, BoundedWindow window) {
- return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private <T> WindowedValue<T> valueInWindow(
- T value, Instant timestamp, BoundedWindow window) {
- return WindowedValue.of(value, timestamp, window, PaneInfo.NO_FIRING);
- }
-
-
- private static class DummyKeySelector implements KeySelector<RawUnionValue, String> {
- @Override
- public String getKey(RawUnionValue stringWindowedValue) throws Exception {
- return "dummy_key";
- }
- }
-
- private static class StringKeySelector implements KeySelector<WindowedValue<String>, String> {
- @Override
- public String getKey(WindowedValue<String> stringWindowedValue) throws Exception {
- return stringWindowedValue.getValue();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
deleted file mode 100644
index 7e7d1e1..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FlinkBroadcastStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class FlinkBroadcastStateInternalsTest {
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
-
- FlinkBroadcastStateInternals<String> underTest;
-
- @Before
- public void initStateInternals() {
- MemoryStateBackend backend = new MemoryStateBackend();
- try {
- OperatorStateBackend operatorStateBackend =
- backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 0), "");
- underTest = new FlinkBroadcastStateInternals<>(1, operatorStateBackend);
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testValue() throws Exception {
- ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- assertNotEquals(
- underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
- value);
-
- assertThat(value.read(), Matchers.nullValue());
- value.write("hello");
- assertThat(value.read(), Matchers.equalTo("hello"));
- value.write("world");
- assertThat(value.read(), Matchers.equalTo("world"));
-
- value.clear();
- assertThat(value.read(), Matchers.nullValue());
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), Matchers.equalTo(0));
- value.add(2);
- assertThat(value.read(), Matchers.equalTo(2));
-
- value.add(3);
- assertThat(value.read(), Matchers.equalTo(5));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(0));
- assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), Matchers.equalTo(11));
- assertThat(value2.read(), Matchers.equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), Matchers.equalTo(21));
- assertThat(value2.read(), Matchers.equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), Matchers.equalTo(0));
- assertThat(value2.read(), Matchers.equalTo(0));
- assertThat(value3.read(), Matchers.equalTo(21));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
deleted file mode 100644
index 5433d07..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.operators.KeyContext;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FlinkKeyGroupStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class FlinkKeyGroupStateInternalsTest {
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
-
- FlinkKeyGroupStateInternals<String> underTest;
- private KeyedStateBackend keyedStateBackend;
-
- @Before
- public void initStateInternals() {
- try {
- keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
- underTest = new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), keyedStateBackend);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private KeyedStateBackend getKeyedStateBackend(int numberOfKeyGroups,
- KeyGroupRange keyGroupRange) {
- MemoryStateBackend backend = new MemoryStateBackend();
- try {
- AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend(
- new DummyEnvironment("test", 1, 0),
- new JobID(),
- "test_op",
- new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()),
- numberOfKeyGroups,
- keyGroupRange,
- new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
- keyedStateBackend.setCurrentKey(ByteBuffer.wrap(
- CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1")));
- return keyedStateBackend;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testKeyGroupAndCheckpoint() throws Exception {
- // assign to keyGroup 0
- ByteBuffer key0 = ByteBuffer.wrap(
- CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111"));
- // assign to keyGroup 1
- ByteBuffer key1 = ByteBuffer.wrap(
- CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222"));
- FlinkKeyGroupStateInternals<String> allState;
- {
- KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
- allState = new FlinkKeyGroupStateInternals<>(
- StringUtf8Coder.of(), keyedStateBackend);
- BagState<String> valueForNamespace0 = allState.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> valueForNamespace1 = allState.state(NAMESPACE_2, STRING_BAG_ADDR);
- keyedStateBackend.setCurrentKey(key0);
- valueForNamespace0.add("0");
- valueForNamespace1.add("2");
- keyedStateBackend.setCurrentKey(key1);
- valueForNamespace0.add("1");
- valueForNamespace1.add("3");
- assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1"));
- assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3"));
- }
-
- ClassLoader classLoader = FlinkKeyGroupStateInternalsTest.class.getClassLoader();
-
- // 1. scale up
- ByteArrayOutputStream out0 = new ByteArrayOutputStream();
- allState.snapshotKeyGroupState(0, new DataOutputStream(out0));
- DataInputStream in0 = new DataInputStream(
- new ByteArrayInputStream(out0.toByteArray()));
- {
- KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 0));
- FlinkKeyGroupStateInternals<String> state0 =
- new FlinkKeyGroupStateInternals<>(
- StringUtf8Coder.of(), keyedStateBackend);
- state0.restoreKeyGroupState(0, in0, classLoader);
- BagState<String> valueForNamespace0 = state0.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> valueForNamespace1 = state0.state(NAMESPACE_2, STRING_BAG_ADDR);
- assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0"));
- assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2"));
- }
-
- ByteArrayOutputStream out1 = new ByteArrayOutputStream();
- allState.snapshotKeyGroupState(1, new DataOutputStream(out1));
- DataInputStream in1 = new DataInputStream(
- new ByteArrayInputStream(out1.toByteArray()));
- {
- KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(1, 1));
- FlinkKeyGroupStateInternals<String> state1 =
- new FlinkKeyGroupStateInternals<>(
- StringUtf8Coder.of(), keyedStateBackend);
- state1.restoreKeyGroupState(1, in1, classLoader);
- BagState<String> valueForNamespace0 = state1.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> valueForNamespace1 = state1.state(NAMESPACE_2, STRING_BAG_ADDR);
- assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("1"));
- assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("3"));
- }
-
- // 2. scale down
- {
- KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
- FlinkKeyGroupStateInternals<String> newAllState = new FlinkKeyGroupStateInternals<>(
- StringUtf8Coder.of(), keyedStateBackend);
- in0.reset();
- in1.reset();
- newAllState.restoreKeyGroupState(0, in0, classLoader);
- newAllState.restoreKeyGroupState(1, in1, classLoader);
- BagState<String> valueForNamespace0 = newAllState.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> valueForNamespace1 = newAllState.state(NAMESPACE_2, STRING_BAG_ADDR);
- assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1"));
- assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3"));
- }
-
- }
-
- private static class TestKeyContext implements KeyContext {
-
- private Object key;
-
- @Override
- public void setCurrentKey(Object key) {
- this.key = key;
- }
-
- @Override
- public Object getCurrentKey() {
- return key;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
deleted file mode 100644
index 08ae0c4..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FlinkSplitStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class FlinkSplitStateInternalsTest {
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
-
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
-
- FlinkSplitStateInternals<String> underTest;
-
- @Before
- public void initStateInternals() {
- MemoryStateBackend backend = new MemoryStateBackend();
- try {
- OperatorStateBackend operatorStateBackend =
- backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 0), "");
- underTest = new FlinkSplitStateInternals<>(operatorStateBackend);
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
deleted file mode 100644
index d140271..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FlinkStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class FlinkStateInternalsTest {
- private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
-
- FlinkStateInternals<String> underTest;
-
- @Before
- public void initStateInternals() {
- MemoryStateBackend backend = new MemoryStateBackend();
- try {
- AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend(
- new DummyEnvironment("test", 1, 0),
- new JobID(),
- "test_op",
- new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()),
- 1,
- new KeyGroupRange(0, 0),
- new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
- underTest = new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of());
-
- keyedStateBackend.setCurrentKey(
- ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Hello")));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testValue() throws Exception {
- ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- assertNotEquals(
- underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
- value);
-
- assertThat(value.read(), Matchers.nullValue());
- value.write("hello");
- assertThat(value.read(), Matchers.equalTo("hello"));
- value.write("world");
- assertThat(value.read(), Matchers.equalTo("world"));
-
- value.clear();
- assertThat(value.read(), Matchers.nullValue());
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), Matchers.equalTo(0));
- value.add(2);
- assertThat(value.read(), Matchers.equalTo(2));
-
- value.add(3);
- assertThat(value.read(), Matchers.equalTo(5));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(0));
- assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), Matchers.equalTo(11));
- assertThat(value2.read(), Matchers.equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), Matchers.equalTo(21));
- assertThat(value2.read(), Matchers.equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), Matchers.equalTo(0));
- assertThat(value2.read(), Matchers.equalTo(0));
- assertThat(value3.read(), Matchers.equalTo(21));
- }
-
- @Test
- public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
- }
-
- @Test
- public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(new Instant(1000));
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
- underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the merged value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
- assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
- underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
- underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
- // Merging clears the old values and updates the result value.
- assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
- assertThat(value1.read(), Matchers.equalTo(null));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
deleted file mode 100644
index 663b910..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import com.google.common.base.Joiner;
-import java.io.Serializable;
-import java.util.Arrays;
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test for GroupByNullKey.
- */
-public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
-
-
- protected String resultPath;
-
- static final String[] EXPECTED_RESULT = new String[] {
- "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
- };
-
- public GroupByNullKeyTest(){
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- /**
- * DoFn extracting user and timestamp.
- */
- private static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- KV<Integer, String> record = c.element();
- int timestamp = record.getKey();
- String userName = record.getValue();
- if (userName != null) {
- // Sets the implicit timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp));
- }
- }
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForStreaming();
-
- PCollection<String> output =
- p.apply(Create.of(Arrays.asList(
- KV.<Integer, String>of(0, "user1"),
- KV.<Integer, String>of(1, "user1"),
- KV.<Integer, String>of(2, "user1"),
- KV.<Integer, String>of(10, "user2"),
- KV.<Integer, String>of(1, "user2"),
- KV.<Integer, String>of(15000, "user2"),
- KV.<Integer, String>of(12000, "user2"),
- KV.<Integer, String>of(25000, "user3"))))
- .apply(ParDo.of(new ExtractUserAndTimestamp()))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
- .triggering(AfterWatermark.pastEndOfWindow())
- .withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
-
- .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- String elem = c.element();
- c.output(KV.<Void, String>of(null, elem));
- }
- }))
- .apply(GroupByKey.<Void, String>create())
- .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- KV<Void, Iterable<String>> elem = c.element();
- StringBuilder str = new StringBuilder();
- str.append("k: " + elem.getKey() + " v:");
- for (String v : elem.getValue()) {
- str.append(" " + v);
- }
- c.output(str.toString());
- }
- }));
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-}