You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:18 UTC
[09/18] beam git commit: [BEAM-1994] Remove Flink examples package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
deleted file mode 100644
index 3a08088..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An unbounded source for testing the unbounded sources framework code.
- *
- * <p>Each split of this sources produces records of the form KV(split_id, i),
- * where i counts up from 0. Each record has a timestamp of i, and the watermark
- * accurately tracks these timestamps. The reader will occasionally return false
- * from {@code advance}, in order to simulate a source where not all the data is
- * available immediately.
- */
-public class TestCountingSource
- extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> {
- private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
-
- private static List<Integer> finalizeTracker;
- private final int numMessagesPerShard;
- private final int shardNumber;
- private final boolean dedup;
- private final boolean throwOnFirstSnapshot;
- private final boolean allowSplitting;
-
- /**
- * We only allow an exception to be thrown from getCheckpointMark
- * at most once. This must be static since the entire TestCountingSource
- * instance may re-serialized when the pipeline recovers and retries.
- */
- private static boolean thrown = false;
-
- public static void setFinalizeTracker(List<Integer> finalizeTracker) {
- TestCountingSource.finalizeTracker = finalizeTracker;
- }
-
- public TestCountingSource(int numMessagesPerShard) {
- this(numMessagesPerShard, 0, false, false, true);
- }
-
- public TestCountingSource withDedup() {
- return new TestCountingSource(
- numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
- }
-
- private TestCountingSource withShardNumber(int shardNumber) {
- return new TestCountingSource(
- numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
- }
-
- public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
- return new TestCountingSource(
- numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
- }
-
- public TestCountingSource withoutSplitting() {
- return new TestCountingSource(
- numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
- }
-
- private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup,
- boolean throwOnFirstSnapshot, boolean allowSplitting) {
- this.numMessagesPerShard = numMessagesPerShard;
- this.shardNumber = shardNumber;
- this.dedup = dedup;
- this.throwOnFirstSnapshot = throwOnFirstSnapshot;
- this.allowSplitting = allowSplitting;
- }
-
- public int getShardNumber() {
- return shardNumber;
- }
-
- @Override
- public List<TestCountingSource> split(
- int desiredNumSplits, PipelineOptions options) {
- List<TestCountingSource> splits = new ArrayList<>();
- int numSplits = allowSplitting ? desiredNumSplits : 1;
- for (int i = 0; i < numSplits; i++) {
- splits.add(withShardNumber(i));
- }
- return splits;
- }
-
- class CounterMark implements UnboundedSource.CheckpointMark {
- int current;
-
- public CounterMark(int current) {
- this.current = current;
- }
-
- @Override
- public void finalizeCheckpoint() {
- if (finalizeTracker != null) {
- finalizeTracker.add(current);
- }
- }
- }
-
- @Override
- public Coder<CounterMark> getCheckpointMarkCoder() {
- return DelegateCoder.of(
- VarIntCoder.of(),
- new DelegateCoder.CodingFunction<CounterMark, Integer>() {
- @Override
- public Integer apply(CounterMark input) {
- return input.current;
- }
- },
- new DelegateCoder.CodingFunction<Integer, CounterMark>() {
- @Override
- public CounterMark apply(Integer input) {
- return new CounterMark(input);
- }
- });
- }
-
- @Override
- public boolean requiresDeduping() {
- return dedup;
- }
-
- /**
- * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to
- * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast.
- */
- public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
- private int current;
-
- public CountingSourceReader(int startingPoint) {
- this.current = startingPoint;
- }
-
- @Override
- public boolean start() {
- return advance();
- }
-
- @Override
- public boolean advance() {
- if (current >= numMessagesPerShard - 1) {
- return false;
- }
- // If testing dedup, occasionally insert a duplicate value;
- if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
- return true;
- }
- current++;
- return true;
- }
-
- @Override
- public KV<Integer, Integer> getCurrent() {
- return KV.of(shardNumber, current);
- }
-
- @Override
- public Instant getCurrentTimestamp() {
- return new Instant(current);
- }
-
- @Override
- public byte[] getCurrentRecordId() {
- try {
- return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {}
-
- @Override
- public TestCountingSource getCurrentSource() {
- return TestCountingSource.this;
- }
-
- @Override
- public Instant getWatermark() {
- // The watermark is a promise about future elements, and the timestamps of elements are
- // strictly increasing for this source.
- return new Instant(current + 1);
- }
-
- @Override
- public CounterMark getCheckpointMark() {
- if (throwOnFirstSnapshot && !thrown) {
- thrown = true;
- LOG.error("Throwing exception while checkpointing counter");
- throw new RuntimeException("failed during checkpoint");
- }
- // The checkpoint can assume all records read, including the current, have
- // been commited.
- return new CounterMark(current);
- }
-
- @Override
- public long getSplitBacklogBytes() {
- return 7L;
- }
- }
-
- @Override
- public CountingSourceReader createReader(
- PipelineOptions options, @Nullable CounterMark checkpointMark) {
- if (checkpointMark == null) {
- LOG.debug("creating reader");
- } else {
- LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
- }
- return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
- return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
deleted file mode 100644
index 9e6bba8..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.base.Joiner;
-import java.io.Serializable;
-import java.util.Arrays;
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-
-/**
- * Session window test.
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
- protected String resultPath;
-
- public TopWikipediaSessionsITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "user: user1 value:3",
- "user: user1 value:1",
- "user: user2 value:4",
- "user: user2 value:6",
- "user: user3 value:7",
- "user: user3 value:2"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForStreaming();
-
- Long now = (System.currentTimeMillis() + 10000) / 1000;
-
- PCollection<KV<String, Long>> output =
- p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now)
- .set("contributor_username", "user3"))))
-
-
-
- .apply(ParDo.of(new DoFn<TableRow, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- TableRow row = c.element();
- long timestamp = (Integer) row.get("timestamp");
- String userName = (String) row.get("contributor_username");
- if (userName != null) {
- // Sets the timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
- }
- }
- }))
-
- .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-
- .apply(Count.<String>perElement());
-
- PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- KV<String, Long> el = c.element();
- String out = "user: " + el.getKey() + " value:" + el.getValue();
- c.output(out);
- }
- }));
-
- format.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
deleted file mode 100644
index 90f95d6..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.Matchers;
-
-/**
- * Tests for {@link UnboundedSourceWrapper}.
- */
-@RunWith(Enclosed.class)
-public class UnboundedSourceWrapperTest {
-
- /**
- * Parameterized tests.
- */
- @RunWith(Parameterized.class)
- public static class UnboundedSourceWrapperTestWithParams {
- private final int numTasks;
- private final int numSplits;
-
- public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
- this.numTasks = numTasks;
- this.numSplits = numSplits;
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- /*
- * Parameters for initializing the tests:
- * {numTasks, numSplits}
- * The test currently assumes powers of two for some assertions.
- */
- return Arrays.asList(new Object[][]{
- {1, 1}, {1, 2}, {1, 4},
- {2, 1}, {2, 2}, {2, 4},
- {4, 1}, {4, 2}, {4, 4}
- });
- }
-
- /**
- * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
- * If numSplits > numTasks the source has one source will manage multiple readers.
- */
- @Test
- public void testReaders() throws Exception {
- final int numElements = 20;
- final Object checkpointLock = new Object();
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // this source will emit exactly NUM_ELEMENTS across all parallel readers,
- // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
- // elements later.
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
-
- assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
- StreamSource<WindowedValue<
- KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
- setupSourceOperator(sourceOperator, numTasks);
-
- try {
- sourceOperator.open();
- sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
- count++;
- if (count >= numElements) {
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() {
-
- }
- });
- } catch (SuccessException e) {
-
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
- // success
- return;
- }
- fail("Read terminated without producing expected number of outputs");
- }
-
- /**
- * Verify that snapshot/restore work as expected. We bring up a source and cancel
- * after seeing a certain number of elements. Then we snapshot that source,
- * bring up a completely new source that we restore from the snapshot and verify
- * that we see all expected elements in the end.
- */
- @Test
- public void testRestore() throws Exception {
- final int numElements = 20;
- final Object checkpointLock = new Object();
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // this source will emit exactly NUM_ELEMENTS across all parallel readers,
- // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
- // elements later.
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
-
- assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
- StreamSource<
- WindowedValue<KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-
- OperatorStateStore backend = mock(OperatorStateStore.class);
-
- TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
- listState = new TestingListState<>();
-
- when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
- .thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(backend);
- when(initializationContext.isRestored()).thenReturn(false, true);
-
- flinkWrapper.initializeState(initializationContext);
-
- setupSourceOperator(sourceOperator, numTasks);
-
- final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
-
- boolean readFirstBatchOfElements = false;
-
- try {
- sourceOperator.open();
- sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
- count++;
- if (count >= numElements / 2) {
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() {
-
- }
- });
- } catch (SuccessException e) {
- // success
- readFirstBatchOfElements = true;
- }
-
- assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
-
- // draw a snapshot
- flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
-
- // test snapshot offsets
- assertEquals(flinkWrapper.getLocalSplitSources().size(),
- listState.getList().size());
- int totalEmit = 0;
- for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : listState.get()) {
- totalEmit += kv.getValue().current + 1;
- }
- assertEquals(numElements / 2, totalEmit);
-
- // test that finalizeCheckpoint on CheckpointMark is called
- final ArrayList<Integer> finalizeList = new ArrayList<>();
- TestCountingSource.setFinalizeTracker(finalizeList);
- flinkWrapper.notifyCheckpointComplete(0);
- assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
-
- // create a completely new source but restore from the snapshot
- TestCountingSource restoredSource = new TestCountingSource(numElements);
- UnboundedSourceWrapper<
- KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
-
- assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
-
- StreamSource<
- WindowedValue<KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> restoredSourceOperator =
- new StreamSource<>(restoredFlinkWrapper);
-
- setupSourceOperator(restoredSourceOperator, numTasks);
-
- // restore snapshot
- restoredFlinkWrapper.initializeState(initializationContext);
-
- boolean readSecondBatchOfElements = false;
-
- // run again and verify that we see the other elements
- try {
- restoredSourceOperator.open();
- restoredSourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
- count++;
- if (count >= numElements / 2) {
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() {
-
- }
- });
- } catch (SuccessException e) {
- // success
- readSecondBatchOfElements = true;
- }
-
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
- assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
-
- // verify that we saw all NUM_ELEMENTS elements
- assertTrue(emittedElements.size() == numElements);
- }
-
- @Test
- public void testNullCheckpoint() throws Exception {
- final int numElements = 20;
- PipelineOptions options = PipelineOptionsFactory.create();
-
- TestCountingSource source = new TestCountingSource(numElements) {
- @Override
- public Coder<CounterMark> getCheckpointMarkCoder() {
- return null;
- }
- };
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
-
- OperatorStateStore backend = mock(OperatorStateStore.class);
-
- TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
- listState = new TestingListState<>();
-
- when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
- .thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(backend);
- when(initializationContext.isRestored()).thenReturn(false, true);
-
- flinkWrapper.initializeState(initializationContext);
-
- StreamSource sourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(sourceOperator, numTasks);
- sourceOperator.open();
-
- flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
-
- assertEquals(0, listState.getList().size());
-
- UnboundedSourceWrapper<
- KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements),
- numSplits);
-
- StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(restoredSourceOperator, numTasks);
- sourceOperator.open();
-
- restoredFlinkWrapper.initializeState(initializationContext);
-
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
- }
-
- @SuppressWarnings("unchecked")
- private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
- ExecutionConfig executionConfig = new ExecutionConfig();
- StreamConfig cfg = new StreamConfig(new Configuration());
-
- cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
- Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
-
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(new Object());
- when(mockTask.getConfiguration()).thenReturn(cfg);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getAccumulatorMap())
- .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
- TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
- when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService);
-
- operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
- }
-
- /**
- * A special {@link RuntimeException} that we throw to signal that the test was successful.
- */
- private static class SuccessException extends RuntimeException {
- }
- }
-
- /**
- * Not parameterized tests.
- */
- public static class BasicTest {
-
- /**
- * Check serialization a {@link UnboundedSourceWrapper}.
- */
- @Test
- public void testSerialization() throws Exception {
- final int parallelism = 1;
- final int numElements = 20;
- PipelineOptions options = PipelineOptionsFactory.create();
-
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, parallelism);
-
- InstantiationUtil.serializeObject(flinkWrapper);
- }
-
- }
-
- private static final class TestingListState<T> implements ListState<T> {
-
- private final List<T> list = new ArrayList<>();
-
- @Override
- public void clear() {
- list.clear();
- }
-
- @Override
- public Iterable<T> get() throws Exception {
- return list;
- }
-
- @Override
- public void add(T value) throws Exception {
- list.add(value);
- }
-
- public List<T> getList() {
- return list;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
deleted file mode 100644
index 08a1e03..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.streaming;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/resources/log4j-test.properties b/runners/flink/runner/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4c74d85..0000000
--- a/runners/flink/runner/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
new file mode 100644
index 0000000..b745f0b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * {@link DefaultValueFactory} for getting a default value for the parallelism option
+ * on {@link FlinkPipelineOptions}.
+ *
+ * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
+ * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
+ * run scripts.
+ */
+public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
+ @Override
+ public Integer create(PipelineOptions options) {
+ return GlobalConfiguration.loadConfiguration()
+ .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..854b674
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
+ * Flink batch job.
+ */
+class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
+ /**
+ * The necessary context in the case of a batch job.
+ */
+ private final FlinkBatchTranslationContext batchContext;
+
+ private int depth = 0;
+
+ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+ this.batchContext = new FlinkBatchTranslationContext(env, options);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes, unchecked")
+ public void translate(Pipeline pipeline) {
+ super.translate(pipeline);
+
+ // terminate dangling DataSets
+ for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
+ dataSet.output(new DiscardingOutputFormat());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+ this.depth++;
+
+ BatchTransformTranslator<?> translator = getTranslator(node);
+
+ if (translator != null) {
+ applyBatchTransform(node.getTransform(), node, translator);
+ LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ } else {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ this.depth--;
+ LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
+
+ // get the transformation corresponding to the node we are
+ // currently visiting and translate it into its Flink alternative.
+ PTransform<?, ?> transform = node.getTransform();
+ BatchTransformTranslator<?> translator =
+ FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator == null) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException("The transform " + transform
+ + " is currently not supported.");
+ }
+ applyBatchTransform(transform, node, translator);
+ }
+
+ private <T extends PTransform<?, ?>> void applyBatchTransform(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ BatchTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the batchContext
+ batchContext.setCurrentTransform(node.toAppliedPTransform());
+ typedTranslator.translateNode(typedTransform, batchContext);
+ }
+
+ /**
+ * A translator of a {@link PTransform}.
+ */
+ public interface BatchTransformTranslator<TransformT extends PTransform> {
+ void translateNode(TransformT transform, FlinkBatchTranslationContext context);
+ }
+
+ /**
+ * Returns a translator for the given node, if it is possible, otherwise null.
+ */
+ private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
+ PTransform<?, ?> transform = node.getTransform();
+
+ // Root of the graph is null
+ if (transform == null) {
+ return null;
+ }
+
+ return FlinkBatchTransformTranslators.getTranslator(transform);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
new file mode 100644
index 0000000..ff9521c
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.util.Collector;
+
+/**
+ * Translators for transforming {@link PTransform PTransforms} to
+ * Flink {@link DataSet DataSets}.
+ */
+class FlinkBatchTransformTranslators {
+
+ // --------------------------------------------------------------------------------------------
+ // Transform Translator Registry
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<
+ Class<? extends PTransform>,
+ FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
+
+ static {
+ TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
+
+ TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+ TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
+
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
+
+ TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
+
+ TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
+
+ TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+ }
+
+
+ static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+ PTransform<?, ?> transform) {
+ return TRANSLATORS.get(transform.getClass());
+ }
+
+ private static class ReadSourceTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
+ String name = transform.getName();
+ BoundedSource<T> source = transform.getSource();
+ PCollection<T> output = context.getOutput(transform);
+
+ TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
+
+ DataSource<WindowedValue<T>> dataSource = new DataSource<>(
+ context.getExecutionEnvironment(),
+ new SourceInputFormat<>(source, context.getPipelineOptions()),
+ typeInformation,
+ name);
+
+ context.setOutputDataSet(output, dataSource);
+ }
+ }
+
+ private static class WindowAssignTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> {
+
+ @Override
+ public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) {
+ PValue input = context.getInput(transform);
+
+ TypeInformation<WindowedValue<T>> resultTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
+
+ @SuppressWarnings("unchecked")
+ final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+ (WindowingStrategy<T, ? extends BoundedWindow>)
+ context.getOutput(transform).getWindowingStrategy();
+
+ WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+ FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+ new FlinkAssignWindows<>(windowFn);
+
+ DataSet<WindowedValue<T>> resultDataSet = inputDataSet
+ .flatMap(assignWindowsFunction)
+ .name(context.getOutput(transform).getName())
+ .returns(resultTypeInfo);
+
+ context.setOutputDataSet(context.getOutput(transform), resultDataSet);
+ }
+ }
+
+ private static class GroupByKeyTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ GroupByKey<K, InputT> transform,
+ FlinkBatchTranslationContext context) {
+
+ // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
+ // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
+
+ DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
+ new Concatenate<InputT>().asKeyedFn();
+
+ KvCoder<K, InputT> inputCoder =
+ (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+ Coder<List<InputT>> accumulatorCoder;
+
+ try {
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry(),
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder());
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ windowingStrategy.getWindowFn().windowCoder()));
+
+
+ Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+ inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+ FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
+ FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
+
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> boundedStrategy =
+ (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+ partialReduceFunction = new FlinkPartialReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ reduceFunction = new FlinkReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ } else {
+ if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ throw new UnsupportedOperationException(
+ "Merging WindowFn with windows other than IntervalWindow are not supported.");
+ }
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, IntervalWindow> intervalStrategy =
+ (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+ partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+
+ reduceFunction = new FlinkMergingReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+ context.getPipelineOptions());
+ }
+
+ // Partially GroupReduce the values into the intermediate format AccumT (combine)
+ GroupCombineOperator<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<KV<K, List<InputT>>>> groupCombine =
+ new GroupCombineOperator<>(
+ inputGrouping,
+ partialReduceTypeInfo,
+ partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
+ groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
+
+ // Fully reduce the values and create output format VO
+ GroupReduceOperator<
+ WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
+ new GroupReduceOperator<>(
+ intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+
+ }
+
+ }
+
+ private static class ReshuffleTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Reshuffle<K, InputT> transform,
+ FlinkBatchTranslationContext context) {
+
+ DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
+
+ }
+
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
+ * is expected to crash!
+ *
+ * <p>This is copied from the dataflow runner code.
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+
+ private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Combine.PerKey<K, InputT, OutputT>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void translateNode(
+ Combine.PerKey<K, InputT, OutputT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
+ (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+
+ KvCoder<K, InputT> inputCoder =
+ (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+ Coder<AccumT> accumulatorCoder;
+
+ try {
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry(),
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder());
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ WindowingStrategy<?, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
+ context.getTypeInfo(
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ windowingStrategy);
+
+ Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+ inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView<?> sideInput: transform.getSideInputs()) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
+
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ WindowingStrategy<?, BoundedWindow> boundedStrategy =
+ (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+ FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
+ new FlinkPartialReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
+ new FlinkReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ // Partially GroupReduce the values into the intermediate format AccumT (combine)
+ GroupCombineOperator<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<KV<K, AccumT>>> groupCombine =
+ new GroupCombineOperator<>(
+ inputGrouping,
+ partialReduceTypeInfo,
+ partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), groupCombine, context);
+
+ TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
+ groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+ new GroupReduceOperator<>(
+ intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+
+ } else {
+ if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ throw new UnsupportedOperationException(
+ "Merging WindowFn with windows other than IntervalWindow are not supported.");
+ }
+
+ // for merging windows we can't to a pre-shuffle combine step since
+ // elements would not be in their correct windows for side-input access
+
+ WindowingStrategy<?, IntervalWindow> intervalStrategy =
+ (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+ FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
+ new FlinkMergingNonShuffleReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<WindowedValue<KV<K, InputT>>> grouping =
+ inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+ new GroupReduceOperator<>(
+ grouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+
+
+ }
+ }
+
+ private static void rejectSplittable(DoFn<?, ?> doFn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+ if (signature.processElement().isSplittable()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s does not currently support splittable DoFn: %s",
+ FlinkRunner.class.getSimpleName(), doFn));
+ }
+ }
+
+ private static class ParDoTranslatorBatch<InputT, OutputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform,
+ FlinkBatchTranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getFn();
+ rejectSplittable(doFn);
+ DataSet<WindowedValue<InputT>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
+
+ Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+ // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this
+ outputMap.put(transform.getMainOutputTag(), 0);
+ int count = 1;
+ for (TupleTag<?> tag : outputs.keySet()) {
+ if (!outputMap.containsKey(tag)) {
+ outputMap.put(tag, count++);
+ }
+ }
+
+ // assume that the windowing strategy is the same for all outputs
+ WindowingStrategy<?, ?> windowingStrategy = null;
+
+ // collect all output Coders and create a UnionCoder for our tagged outputs
+ List<Coder<?>> outputCoders = Lists.newArrayList();
+ for (PValue taggedValue : outputs.values()) {
+ checkState(
+ taggedValue instanceof PCollection,
+ "Within ParDo, got a non-PCollection output %s of type %s",
+ taggedValue,
+ taggedValue.getClass().getSimpleName());
+ PCollection<?> coll = (PCollection<?>) taggedValue;
+ outputCoders.add(coll.getCoder());
+ windowingStrategy = coll.getWindowingStrategy();
+ }
+
+ if (windowingStrategy == null) {
+ throw new IllegalStateException("No outputs defined.");
+ }
+
+ UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+ TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ unionCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView<?> sideInput: sideInputs) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
+
+ SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+
+ // Based on the fact that the signature is stateful, DoFnSignatures ensures
+ // that it is also keyed
+ KvCoder<?, InputT> inputCoder =
+ (KvCoder<?, InputT>) context.getInput(transform).getCoder();
+
+ FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
+ (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
+ outputMap, transform.getMainOutputTag()
+ );
+
+ Grouping<WindowedValue<InputT>> grouping =
+ inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
+
+ outputDataSet =
+ new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName());
+
+ } else {
+ FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
+ new FlinkDoFnFunction(
+ doFn,
+ windowingStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions(),
+ outputMap,
+ transform.getMainOutputTag());
+
+ outputDataSet = new MapPartitionOperator<>(
+ inputDataSet, typeInformation,
+ doFnWrapper, transform.getName());
+
+ }
+
+ transformSideInputs(sideInputs, outputDataSet, context);
+
+ for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+ pruneOutput(
+ outputDataSet,
+ context,
+ outputMap.get(output.getKey()),
+ (PCollection) output.getValue());
+ }
+
+ }
+
+ private <T> void pruneOutput(
+ DataSet<WindowedValue<RawUnionValue>> taggedDataSet,
+ FlinkBatchTranslationContext context,
+ int integerTag,
+ PCollection<T> collection) {
+ TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
+
+ FlinkMultiOutputPruningFunction<T> pruningFunction =
+ new FlinkMultiOutputPruningFunction<>(integerTag);
+
+ FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
+ new FlatMapOperator<>(
+ taggedDataSet,
+ outputType,
+ pruningFunction,
+ collection.getName());
+
+ context.setOutputDataSet(collection, pruningOperator);
+ }
+ }
+
+ private static class FlattenPCollectionTranslatorBatch<T>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Flatten.PCollections<T>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void translateNode(
+ Flatten.PCollections<T> transform,
+ FlinkBatchTranslationContext context) {
+
+ Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
+ DataSet<WindowedValue<T>> result = null;
+
+ if (allInputs.isEmpty()) {
+
+ // create an empty dummy source to satisfy downstream operations
+ // we cannot create an empty source in Flink, therefore we have to
+ // add the flatMap that simply never forwards the single element
+ DataSource<String> dummySource =
+ context.getExecutionEnvironment().fromElements("dummy");
+ result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
+ @Override
+ public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
+ // never return anything
+ }
+ }).returns(
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ (Coder<T>) VoidCoder.of(),
+ GlobalWindow.Coder.INSTANCE)));
+ } else {
+ for (PValue taggedPc : allInputs.values()) {
+ checkArgument(
+ taggedPc instanceof PCollection,
+ "Got non-PCollection input to flatten: %s of type %s",
+ taggedPc,
+ taggedPc.getClass().getSimpleName());
+ PCollection<T> collection = (PCollection<T>) taggedPc;
+ DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
+ if (result == null) {
+ result = current;
+ } else {
+ result = result.union(current);
+ }
+ }
+ }
+
+ // insert a dummy filter, there seems to be a bug in Flink
+ // that produces duplicate elements after the union in some cases
+ // if we don't
+ result = result.filter(new FilterFunction<WindowedValue<T>>() {
+ @Override
+ public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
+ return true;
+ }
+ }).name("UnionFixFilter");
+ context.setOutputDataSet(context.getOutput(transform), result);
+ }
+ }
+
+ private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ View.CreatePCollectionView<ElemT, ViewT>> {
+
+ @Override
+ public void translateNode(
+ View.CreatePCollectionView<ElemT, ViewT> transform,
+ FlinkBatchTranslationContext context) {
+ DataSet<WindowedValue<ElemT>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+
+ PCollectionView<ViewT> input = transform.getView();
+
+ context.setSideInputDataSet(input, inputDataSet);
+ }
+ }
+
+ private static void transformSideInputs(
+ List<PCollectionView<?>> sideInputs,
+ SingleInputUdfOperator<?, ?, ?> outputDataSet,
+ FlinkBatchTranslationContext context) {
+ // get corresponding Flink broadcast DataSets
+ for (PCollectionView<?> input : sideInputs) {
+ DataSet<?> broadcastSet = context.getSideInputDataSet(input);
+ outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
+ }
+ }
+
+ private FlinkBatchTransformTranslators() {}
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
new file mode 100644
index 0000000..98dd0fb
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Helper for {@link FlinkBatchPipelineTranslator} and translators in
+ * {@link FlinkBatchTransformTranslators}.
+ */
+class FlinkBatchTranslationContext {
+
+ private final Map<PValue, DataSet<?>> dataSets;
+ private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
+
+ /**
+ * For keeping track about which DataSets don't have a successor. We
+ * need to terminate these with a discarding sink because the Beam
+ * model allows dangling operations.
+ */
+ private final Map<PValue, DataSet<?>> danglingDataSets;
+
+ private final ExecutionEnvironment env;
+ private final PipelineOptions options;
+
+ private AppliedPTransform<?, ?, ?> currentTransform;
+
+ // ------------------------------------------------------------------------
+
+ public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
+ this.env = env;
+ this.options = options;
+ this.dataSets = new HashMap<>();
+ this.broadcastDataSets = new HashMap<>();
+
+ this.danglingDataSets = new HashMap<>();
+ }
+
+ // ------------------------------------------------------------------------
+
+ public Map<PValue, DataSet<?>> getDanglingDataSets() {
+ return danglingDataSets;
+ }
+
+ public ExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
+ // assume that the DataSet is used as an input if retrieved here
+ danglingDataSets.remove(value);
+ return (DataSet<WindowedValue<T>>) dataSets.get(value);
+ }
+
+ public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
+ if (!dataSets.containsKey(value)) {
+ dataSets.put(value, set);
+ danglingDataSets.put(value, set);
+ }
+ }
+
+ /**
+ * Sets the AppliedPTransform which carries input/output.
+ * @param currentTransform
+ */
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
+ return (DataSet<T>) broadcastDataSets.get(value);
+ }
+
+ public <ViewT, ElemT> void setSideInputDataSet(
+ PCollectionView<ViewT> value,
+ DataSet<WindowedValue<ElemT>> set) {
+ if (!broadcastDataSets.containsKey(value)) {
+ broadcastDataSets.put(value, set);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+ return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
+ Coder<T> coder,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ coder,
+ windowingStrategy.getWindowFn().windowCoder());
+
+ return new CoderTypeInformation<>(windowedValueCoder);
+ }
+
+ Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
+ return currentTransform.getInputs();
+ }
+
+ @SuppressWarnings("unchecked")
+ <T extends PValue> T getInput(PTransform<T, ?> transform) {
+ return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+ }
+
+ Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
+ return currentTransform.getOutputs();
+ }
+
+ @SuppressWarnings("unchecked")
+ <T extends PValue> T getOutput(PTransform<?, T> transform) {
+ return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
new file mode 100644
index 0000000..bf4395f
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+
+/**
+ * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink.
+ * In detached execution, results and job execution are currently unavailable.
+ */
+public class FlinkDetachedRunnerResult implements PipelineResult {
+
+ FlinkDetachedRunnerResult() {}
+
+ @Override
+ public State getState() {
+ return State.UNKNOWN;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ throw new AggregatorRetrievalException(
+ "Accumulators can't be retrieved for detached Job executions.",
+ new UnsupportedOperationException());
+ }
+
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ throw new UnsupportedOperationException("Cancelling is not yet supported.");
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return State.UNKNOWN;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ return State.UNKNOWN;
+ }
+
+ @Override
+ public String toString() {
+ return "FlinkDetachedRunnerResult{}";
+ }
+}